Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
297a3be
feat(schema): add TInsertOnlyStrategyDict type for insert-only scope
mattiasthalen Apr 8, 2026
cc718a7
feat(extract): extract insert-only scope hint from write disposition …
mattiasthalen Apr 8, 2026
aa3ab96
feat(destinations): add previous_load scope filter to insert-only mer…
mattiasthalen Apr 8, 2026
492210b
test(load): add tests for insert-only scope=previous_load
mattiasthalen Apr 8, 2026
7c79fc2
fix(destinations): use pre-filter staging + MERGE ON FALSE for insert…
mattiasthalen Apr 8, 2026
f0126d4
feat(iceberg): add previous_load scope to iceberg insert-only merge
mattiasthalen Apr 8, 2026
daa7344
feat(delta): add previous_load scope to delta insert-only merge
mattiasthalen Apr 8, 2026
c9e0c3e
fix(iceberg): use previous_load_id from loads table and append after …
mattiasthalen Apr 8, 2026
9ac4f35
fix(review): remove iceberg fallback and fix composite key separator
mattiasthalen Apr 8, 2026
21085df
test(load): add nested table and hard-delete tests for insert-only scope
mattiasthalen Apr 8, 2026
c687340
fix(destinations): fix nested table scope SQL and harden load_id parsing
mattiasthalen Apr 8, 2026
4ae2b63
fix(review): use C_DLT_LOAD_ID constant and document root-table-only …
mattiasthalen Apr 8, 2026
81d2b3f
style: apply black formatting
mattiasthalen Apr 9, 2026
b499361
fix(review): MSSQL compat, composite key portability, LanceDB gate
mattiasthalen Apr 9, 2026
b5c3658
fix(extract): validate insert-only scope values at resource definitio…
mattiasthalen Apr 9, 2026
52a5c6a
fix(filesystem): apply previous_load scope to nested table-format loads
mattiasthalen Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 115 additions & 2 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dlt.common.libs.pyarrow import pyarrow as pa
from dlt.common.libs.pyarrow import cast_arrow_schema_types
from dlt.common.libs.utils import load_open_tables
from dlt.common.schema.typing import TWriteDisposition, TTableSchema
from dlt.common.schema.typing import TWriteDisposition, TTableSchema, C_DLT_LOAD_ID
from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop
from dlt.common.exceptions import MissingDependencyException, ValueErrorWithKnownValues
from dlt.common.typing import DictStrAny
Expand Down Expand Up @@ -114,12 +114,107 @@ def write_delta_table(
)


def _filter_by_previous_load(
table: DeltaTable,
source_data: Union[pa.Table, pa.RecordBatchReader],
schema: TTableSchema,
previous_load_id: Optional[str],
root_table: Optional[DeltaTable] = None,
) -> pa.Table:
"""Remove from source rows whose keys exist in the previous load's target data."""
if isinstance(source_data, pa.RecordBatchReader):
source_data = source_data.read_all()

if previous_load_id is None:
return source_data

key_cols: List[str]
if "parent" in schema:
key_cols = [get_first_column_name_with_prop(schema, "unique")]
else:
key_cols = get_columns_names_with_prop(schema, "primary_key")

if "parent" in schema and root_table is not None:
# child tables lack _dlt_load_id; join through root table via root_key
root_key_col = get_first_column_name_with_prop(schema, "root_key")
prev_load_keys = _get_child_keys_from_previous_load(
table, root_table, key_cols, root_key_col, previous_load_id
)
else:
target_arrow = table.to_pyarrow_table(columns=key_cols + [C_DLT_LOAD_ID])
if target_arrow.num_rows == 0:
return source_data
prev_load_keys = target_arrow.filter(
pa.compute.equal(target_arrow.column(C_DLT_LOAD_ID), previous_load_id)
).select(key_cols)

if prev_load_keys.num_rows == 0:
return source_data

return _anti_join(source_data, prev_load_keys, key_cols)


def _get_child_keys_from_previous_load(
child_table: DeltaTable,
root_table: DeltaTable,
key_cols: List[str],
root_key_col: str,
previous_load_id: str,
) -> pa.Table:
"""Get child table keys that belong to root rows from the previous load."""
from dlt.common.schema.typing import C_DLT_ID

root_arrow = root_table.to_pyarrow_table(columns=[C_DLT_ID, C_DLT_LOAD_ID])
if root_arrow.num_rows == 0:
return pa.table({c: pa.array([], type=pa.string()) for c in key_cols})

prev_root_ids = root_arrow.filter(
pa.compute.equal(root_arrow.column(C_DLT_LOAD_ID), previous_load_id)
).column(C_DLT_ID)

if len(prev_root_ids) == 0:
return pa.table({c: pa.array([], type=pa.string()) for c in key_cols})

child_arrow = child_table.to_pyarrow_table(columns=key_cols + [root_key_col])
if child_arrow.num_rows == 0:
return pa.table({c: pa.array([], type=pa.string()) for c in key_cols})

mask = pa.compute.is_in(child_arrow.column(root_key_col), value_set=prev_root_ids)
return child_arrow.filter(mask).select(key_cols)


def _anti_join(
source_data: pa.Table, prev_load_keys: pa.Table, key_cols: List[str]
) -> pa.Table:
"""Remove rows from source_data whose key columns match prev_load_keys."""
if len(key_cols) == 1:
col = key_cols[0]
mask = pa.compute.invert(
pa.compute.is_in(source_data.column(col), value_set=prev_load_keys.column(col))
)
return source_data.filter(mask)

def concat_keys(tbl: pa.Table, cols: List[str]) -> pa.Array:
arrays = [pa.compute.cast(tbl.column(c), pa.string()) for c in cols]
result = arrays[0]
for arr in arrays[1:]:
result = pa.compute.binary_join_element_wise(result, arr, "\x00")
return result

prev_concat = concat_keys(prev_load_keys, key_cols)
source_concat = concat_keys(source_data, key_cols)
mask = pa.compute.invert(pa.compute.is_in(source_concat, value_set=prev_concat))
return source_data.filter(mask)


def merge_delta_table(
table: DeltaTable,
data: Union[pa.Table, pa.RecordBatchReader],
schema: TTableSchema,
load_table_name: str,
streamed_exec: bool,
previous_load_id: Optional[str] = None,
root_table: Optional[DeltaTable] = None,
) -> None:
"""Merges in-memory Arrow data into on-disk Delta table."""

Expand All @@ -135,8 +230,26 @@ def merge_delta_table(
predicate = " AND ".join([f"target.{c} = source.{c}" for c in primary_keys])

partition_by = get_columns_names_with_prop(schema, "partition")
source_data = ensure_delta_compatible_arrow_data(data, partition_by)

if strategy == "insert-only" and schema.get("x-insert-only-scope") == "previous_load":
source_data = _filter_by_previous_load(
table, source_data, schema, previous_load_id, root_table
)
if source_data.num_rows == 0:
return
# force-insert all remaining rows without dedup: equivalent to SQL's "MERGE ON FALSE"
table.merge(
source=source_data,
predicate="1=0",
source_alias="source",
target_alias="target",
streamed_exec=streamed_exec,
).when_not_matched_insert_all().execute()
return

qry = table.merge(
source=ensure_delta_compatible_arrow_data(data, partition_by),
source=source_data,
predicate=predicate,
source_alias="source",
target_alias="target",
Expand Down
101 changes: 100 additions & 1 deletion dlt/common/libs/pyiceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dlt.common.libs.pyarrow import cast_arrow_schema_types
from dlt.common.libs.utils import load_open_tables
from dlt.common.pipeline import SupportsPipeline
from dlt.common.schema.typing import TWriteDisposition, TTableSchema
from dlt.common.schema.typing import C_DLT_LOAD_ID, TWriteDisposition, TTableSchema
from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop
from dlt.common.utils import assert_min_pkg_version
from dlt.common.exceptions import MissingDependencyException
Expand All @@ -29,11 +29,13 @@
from pyiceberg.table import Table as IcebergTable
from pyiceberg.catalog import Catalog as IcebergCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import EqualTo
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionSpec as IcebergPartitionSpec,
)
import pyarrow as pa
import pyarrow.compute as pc
from pydantic import BaseModel, ConfigDict, Field
except ModuleNotFoundError:
raise MissingDependencyException(
Expand Down Expand Up @@ -88,11 +90,93 @@ def write_iceberg_table(
)


def _filter_by_previous_load_iceberg(
table: IcebergTable,
source_data: pa.Table,
key_cols: List[str],
previous_load_id: Optional[str] = None,
root_table: Optional[IcebergTable] = None,
root_key_col: Optional[str] = None,
) -> pa.Table:
"""Remove from source rows whose keys exist in the previous load's target data."""
if previous_load_id is None:
return source_data

if root_table is not None and root_key_col is not None:
# child tables lack _dlt_load_id; join through root table via root_key
prev_load_keys = _get_child_keys_from_previous_load_iceberg(
table, root_table, key_cols, root_key_col, previous_load_id
)
else:
prev_load_keys = table.scan(
selected_fields=tuple(key_cols),
row_filter=EqualTo(C_DLT_LOAD_ID, previous_load_id),
).to_arrow()

if prev_load_keys.num_rows == 0:
return source_data

return _anti_join_iceberg(source_data, prev_load_keys, key_cols)


def _get_child_keys_from_previous_load_iceberg(
child_table: IcebergTable,
root_table: IcebergTable,
key_cols: List[str],
root_key_col: str,
previous_load_id: str,
) -> pa.Table:
"""Get child table keys that belong to root rows from the previous load."""
from dlt.common.schema.typing import C_DLT_ID

prev_root_ids = root_table.scan(
selected_fields=(C_DLT_ID,),
row_filter=EqualTo(C_DLT_LOAD_ID, previous_load_id),
).to_arrow().column(C_DLT_ID)

if len(prev_root_ids) == 0:
return pa.table({c: pa.array([], type=pa.string()) for c in key_cols})

child_arrow = child_table.scan(
selected_fields=tuple(key_cols) + (root_key_col,),
).to_arrow()

if child_arrow.num_rows == 0:
return pa.table({c: pa.array([], type=pa.string()) for c in key_cols})

mask = pc.is_in(child_arrow.column(root_key_col), value_set=prev_root_ids)
return child_arrow.filter(mask).select(key_cols)


def _anti_join_iceberg(
source_data: pa.Table, prev_load_keys: pa.Table, key_cols: List[str]
) -> pa.Table:
"""Remove rows from source_data whose key columns match prev_load_keys."""
if len(key_cols) == 1:
col = key_cols[0]
mask = pc.invert(pc.is_in(source_data.column(col), value_set=prev_load_keys.column(col)))
return source_data.filter(mask)

def concat_keys(tbl: pa.Table, cols: List[str]) -> pa.Array:
arrays = [pc.cast(tbl.column(c), pa.string()) for c in cols]
result = arrays[0]
for arr in arrays[1:]:
result = pc.binary_join_element_wise(result, arr, "\x00")
return result

prev_concat = concat_keys(prev_load_keys, key_cols)
source_concat = concat_keys(source_data, key_cols)
mask = pc.invert(pc.is_in(source_concat, value_set=prev_concat))
return source_data.filter(mask)


def merge_iceberg_table(
table: IcebergTable,
data: pa.Table,
schema: TTableSchema,
load_table_name: str,
previous_load_id: Optional[str] = None,
root_table: Optional[IcebergTable] = None,
) -> None:
"""Merges in-memory Arrow data into on-disk Iceberg table."""
strategy = schema["x-merge-strategy"] # type: ignore[typeddict-item]
Expand All @@ -106,6 +190,21 @@ def merge_iceberg_table(
else:
join_cols = get_columns_names_with_prop(schema, "primary_key")

if strategy == "insert-only" and schema.get("x-insert-only-scope") == "previous_load":
root_key_col = (
get_first_column_name_with_prop(schema, "root_key")
if "parent" in schema
else None
)
data = _filter_by_previous_load_iceberg(
table, data, join_cols, previous_load_id, root_table, root_key_col
)
if data.num_rows == 0:
return
# append directly — upsert would match keys against all history
table.append(ensure_iceberg_compatible_arrow_data(data))
return

# TODO: replace the batching method with transaction with pyiceberg's release after 0.9.1
for rb in data.to_batches(max_chunksize=1_000):
batch_tbl = pa.Table.from_batches([rb])
Expand Down
7 changes: 7 additions & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,13 @@ class NormalizerInfo(TypedDict, total=True):
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert", "insert-only"]
TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TInsertOnlyScope = Literal["previous_load"]


WRITE_DISPOSITIONS: Sequence[TWriteDisposition] = sorted(get_args(TWriteDisposition))
MERGE_STRATEGIES: Sequence[TLoaderMergeStrategy] = sorted(get_args(TLoaderMergeStrategy))
REPLACE_STRATEGIES: Sequence[TLoaderReplaceStrategy] = sorted(get_args(TLoaderReplaceStrategy))
INSERT_ONLY_SCOPES: Sequence[TInsertOnlyScope] = sorted(get_args(TInsertOnlyScope))

DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"]
"""Default values for validity column names used in `scd2` merge strategy."""
Expand All @@ -279,12 +281,17 @@ class TScd2StrategyDict(TMergeDispositionDict, total=False):
row_version_column_name: Optional[str]


class TInsertOnlyStrategyDict(TMergeDispositionDict, total=False):
scope: Optional[TInsertOnlyScope]


TWriteDispositionConfig = Union[
TWriteDisposition,
TWriteDispositionDict,
TMergeDispositionDict,
TScd2StrategyDict,
TDeleteInsertStrategyDict,
TInsertOnlyStrategyDict,
]


Expand Down
10 changes: 10 additions & 0 deletions dlt/destinations/impl/ducklake/ducklake.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ def gen_upsert_merge_sql(
deleted_cond: Optional[str],
insert_only: bool = False,
not_deleted_cond: Optional[str] = None,
scope: Optional[str] = None,
dlt_load_id_col: Optional[str] = None,
loads_table_name: Optional[str] = None,
escaped_load_id: Optional[str] = None,
escaped_status: Optional[str] = None,
) -> List[str]:
"""Generate MERGE statement without DELETE clause + separate DELETE for hard deletes."""
# insert-only: no DuckLake-specific workaround needed
Expand All @@ -100,6 +105,11 @@ def gen_upsert_merge_sql(
deleted_cond,
insert_only=True,
not_deleted_cond=not_deleted_cond,
scope=scope,
dlt_load_id_col=dlt_load_id_col,
loads_table_name=loads_table_name,
escaped_load_id=escaped_load_id,
escaped_status=escaped_status,
)

# upsert: get MERGE without DELETE clause, then add separate DELETE for hard deletes
Expand Down
Loading
Loading