diff --git a/dlt/common/libs/deltalake.py b/dlt/common/libs/deltalake.py index 6a28e04cef..9a6cc02056 100644 --- a/dlt/common/libs/deltalake.py +++ b/dlt/common/libs/deltalake.py @@ -8,7 +8,7 @@ from dlt.common.libs.pyarrow import pyarrow as pa from dlt.common.libs.pyarrow import cast_arrow_schema_types from dlt.common.libs.utils import load_open_tables -from dlt.common.schema.typing import TWriteDisposition, TTableSchema +from dlt.common.schema.typing import TWriteDisposition, TTableSchema, C_DLT_LOAD_ID from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop from dlt.common.exceptions import MissingDependencyException, ValueErrorWithKnownValues from dlt.common.typing import DictStrAny @@ -114,12 +114,107 @@ def write_delta_table( ) +def _filter_by_previous_load( + table: DeltaTable, + source_data: Union[pa.Table, pa.RecordBatchReader], + schema: TTableSchema, + previous_load_id: Optional[str], + root_table: Optional[DeltaTable] = None, +) -> pa.Table: + """Remove from source rows whose keys exist in the previous load's target data.""" + if isinstance(source_data, pa.RecordBatchReader): + source_data = source_data.read_all() + + if previous_load_id is None: + return source_data + + key_cols: List[str] + if "parent" in schema: + key_cols = [get_first_column_name_with_prop(schema, "unique")] + else: + key_cols = get_columns_names_with_prop(schema, "primary_key") + + if "parent" in schema and root_table is not None: + # child tables lack _dlt_load_id; join through root table via root_key + root_key_col = get_first_column_name_with_prop(schema, "root_key") + prev_load_keys = _get_child_keys_from_previous_load( + table, root_table, key_cols, root_key_col, previous_load_id + ) + else: + target_arrow = table.to_pyarrow_table(columns=key_cols + [C_DLT_LOAD_ID]) + if target_arrow.num_rows == 0: + return source_data + prev_load_keys = target_arrow.filter( + pa.compute.equal(target_arrow.column(C_DLT_LOAD_ID), previous_load_id) + ).select(key_cols) + + if prev_load_keys.num_rows == 0: + return source_data + + return _anti_join(source_data, prev_load_keys, key_cols) + + +def _get_child_keys_from_previous_load( + child_table: DeltaTable, + root_table: DeltaTable, + key_cols: List[str], + root_key_col: str, + previous_load_id: str, +) -> pa.Table: + """Get child table keys that belong to root rows from the previous load.""" + from dlt.common.schema.typing import C_DLT_ID + + root_arrow = root_table.to_pyarrow_table(columns=[C_DLT_ID, C_DLT_LOAD_ID]) + if root_arrow.num_rows == 0: + return pa.table({c: pa.array([], type=pa.string()) for c in key_cols}) + + prev_root_ids = root_arrow.filter( + pa.compute.equal(root_arrow.column(C_DLT_LOAD_ID), previous_load_id) + ).column(C_DLT_ID) + + if len(prev_root_ids) == 0: + return pa.table({c: pa.array([], type=pa.string()) for c in key_cols}) + + child_arrow = child_table.to_pyarrow_table(columns=key_cols + [root_key_col]) + if child_arrow.num_rows == 0: + return pa.table({c: pa.array([], type=pa.string()) for c in key_cols}) + + mask = pa.compute.is_in(child_arrow.column(root_key_col), value_set=prev_root_ids) + return child_arrow.filter(mask).select(key_cols) + + +def _anti_join( + source_data: pa.Table, prev_load_keys: pa.Table, key_cols: List[str] +) -> pa.Table: + """Remove rows from source_data whose key columns match prev_load_keys.""" + if len(key_cols) == 1: + col = key_cols[0] + mask = pa.compute.invert( + pa.compute.is_in(source_data.column(col), value_set=prev_load_keys.column(col)) + ) + return source_data.filter(mask) + + def concat_keys(tbl: pa.Table, cols: List[str]) -> pa.Array: + arrays = [pa.compute.cast(tbl.column(c), pa.string()) for c in cols] + result = arrays[0] + for arr in arrays[1:]: + result = pa.compute.binary_join_element_wise(result, arr, "\x00") + return result + + prev_concat = concat_keys(prev_load_keys, key_cols) + source_concat = concat_keys(source_data, key_cols) + mask = pa.compute.invert(pa.compute.is_in(source_concat, value_set=prev_concat)) + return source_data.filter(mask) + + def merge_delta_table( table: DeltaTable, data: Union[pa.Table, pa.RecordBatchReader], schema: TTableSchema, load_table_name: str, streamed_exec: bool, + previous_load_id: Optional[str] = None, + root_table: Optional[DeltaTable] = None, ) -> None: """Merges in-memory Arrow data into on-disk Delta table.""" @@ -135,8 +230,26 @@ def merge_delta_table( predicate = " AND ".join([f"target.{c} = source.{c}" for c in primary_keys]) partition_by = get_columns_names_with_prop(schema, "partition") + source_data = ensure_delta_compatible_arrow_data(data, partition_by) + + if strategy == "insert-only" and schema.get("x-insert-only-scope") == "previous_load": + source_data = _filter_by_previous_load( + table, source_data, schema, previous_load_id, root_table + ) + if source_data.num_rows == 0: + return + # force-insert all remaining rows without dedup: equivalent to SQL's "MERGE ON FALSE" + table.merge( + source=source_data, + predicate="1=0", + source_alias="source", + target_alias="target", + streamed_exec=streamed_exec, + ).when_not_matched_insert_all().execute() + return + qry = table.merge( - source=ensure_delta_compatible_arrow_data(data, partition_by), + source=source_data, predicate=predicate, source_alias="source", target_alias="target", diff --git a/dlt/common/libs/pyiceberg.py b/dlt/common/libs/pyiceberg.py index 4890ce1a5c..e85852f087 100644 --- a/dlt/common/libs/pyiceberg.py +++ b/dlt/common/libs/pyiceberg.py @@ -14,7 +14,7 @@ from dlt.common.libs.pyarrow import cast_arrow_schema_types from dlt.common.libs.utils import load_open_tables from dlt.common.pipeline import SupportsPipeline -from dlt.common.schema.typing import TWriteDisposition, TTableSchema +from dlt.common.schema.typing import C_DLT_LOAD_ID, TWriteDisposition, TTableSchema from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop from dlt.common.utils import assert_min_pkg_version from dlt.common.exceptions import MissingDependencyException @@ -29,11 +29,13 @@ from pyiceberg.table import Table as IcebergTable from pyiceberg.catalog import Catalog as IcebergCatalog from pyiceberg.exceptions import NoSuchTableError + from pyiceberg.expressions import EqualTo from pyiceberg.partitioning import ( UNPARTITIONED_PARTITION_SPEC, PartitionSpec as IcebergPartitionSpec, ) import pyarrow as pa + import pyarrow.compute as pc from pydantic import BaseModel, ConfigDict, Field except ModuleNotFoundError: raise MissingDependencyException( @@ -88,11 +90,93 @@ def write_iceberg_table( ) +def _filter_by_previous_load_iceberg( + table: IcebergTable, + source_data: pa.Table, + key_cols: List[str], + previous_load_id: Optional[str] = None, + root_table: Optional[IcebergTable] = None, + root_key_col: Optional[str] = None, +) -> pa.Table: + """Remove from source rows whose keys exist in the previous load's target data.""" + if previous_load_id is None: + return source_data + + if root_table is not None and root_key_col is not None: + # child tables lack _dlt_load_id; join through root table via root_key + prev_load_keys = _get_child_keys_from_previous_load_iceberg( + table, root_table, key_cols, root_key_col, previous_load_id + ) + else: + prev_load_keys = table.scan( + selected_fields=tuple(key_cols), + row_filter=EqualTo(C_DLT_LOAD_ID, previous_load_id), + ).to_arrow() + + if prev_load_keys.num_rows == 0: + return source_data + + return _anti_join_iceberg(source_data, prev_load_keys, key_cols) + + +def _get_child_keys_from_previous_load_iceberg( + child_table: IcebergTable, + root_table: IcebergTable, + key_cols: List[str], + root_key_col: str, + previous_load_id: str, +) -> pa.Table: + """Get child table keys that belong to root rows from the previous load.""" + from dlt.common.schema.typing import C_DLT_ID + + prev_root_ids = root_table.scan( + selected_fields=(C_DLT_ID,), + row_filter=EqualTo(C_DLT_LOAD_ID, previous_load_id), + ).to_arrow().column(C_DLT_ID) + + if len(prev_root_ids) == 0: + return pa.table({c: pa.array([], type=pa.string()) for c in key_cols}) + + child_arrow = child_table.scan( + selected_fields=tuple(key_cols) + (root_key_col,), + ).to_arrow() + + if child_arrow.num_rows == 0: + return pa.table({c: pa.array([], type=pa.string()) for c in key_cols}) + + mask = pc.is_in(child_arrow.column(root_key_col), value_set=prev_root_ids) + return child_arrow.filter(mask).select(key_cols) + + +def _anti_join_iceberg( + source_data: pa.Table, prev_load_keys: pa.Table, key_cols: List[str] +) -> pa.Table: + """Remove rows from source_data whose key columns match prev_load_keys.""" + if len(key_cols) == 1: + col = key_cols[0] + mask = pc.invert(pc.is_in(source_data.column(col), value_set=prev_load_keys.column(col))) + return source_data.filter(mask) + + def concat_keys(tbl: pa.Table, cols: List[str]) -> pa.Array: + arrays = [pc.cast(tbl.column(c), pa.string()) for c in cols] + result = arrays[0] + for arr in arrays[1:]: + result = pc.binary_join_element_wise(result, arr, "\x00") + return result + + prev_concat = concat_keys(prev_load_keys, key_cols) + source_concat = concat_keys(source_data, key_cols) + mask = pc.invert(pc.is_in(source_concat, value_set=prev_concat)) + return source_data.filter(mask) + + def merge_iceberg_table( table: IcebergTable, data: pa.Table, schema: TTableSchema, load_table_name: str, + previous_load_id: Optional[str] = None, + root_table: Optional[IcebergTable] = None, ) -> None: """Merges in-memory Arrow data into on-disk Iceberg table.""" strategy = schema["x-merge-strategy"] # type: ignore[typeddict-item] @@ -106,6 +190,21 @@ def merge_iceberg_table( else: join_cols = get_columns_names_with_prop(schema, "primary_key") + if strategy == "insert-only" and schema.get("x-insert-only-scope") == "previous_load": + root_key_col = ( + get_first_column_name_with_prop(schema, "root_key") + if "parent" in schema + else None + ) + data = _filter_by_previous_load_iceberg( + table, data, join_cols, previous_load_id, root_table, root_key_col + ) + if data.num_rows == 0: + return + # append directly — upsert would match keys against all history + table.append(ensure_iceberg_compatible_arrow_data(data)) + return + # TODO: replace the batching method with transaction with pyiceberg's release after 0.9.1 for rb in data.to_batches(max_chunksize=1_000): batch_tbl = pa.Table.from_batches([rb]) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index f8a2404b9a..7829f25d10 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -250,11 +250,13 @@ class NormalizerInfo(TypedDict, total=True): TWriteDisposition = Literal["skip", "append", "replace", "merge"] TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert", "insert-only"] TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"] +TInsertOnlyScope = Literal["previous_load"] WRITE_DISPOSITIONS: Sequence[TWriteDisposition] = sorted(get_args(TWriteDisposition)) MERGE_STRATEGIES: Sequence[TLoaderMergeStrategy] = sorted(get_args(TLoaderMergeStrategy)) REPLACE_STRATEGIES: Sequence[TLoaderReplaceStrategy] = sorted(get_args(TLoaderReplaceStrategy)) +INSERT_ONLY_SCOPES: Sequence[TInsertOnlyScope] = sorted(get_args(TInsertOnlyScope)) DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"] """Default values for validity column names used in `scd2` merge strategy.""" @@ -279,12 +281,17 @@ class TScd2StrategyDict(TMergeDispositionDict, total=False): row_version_column_name: Optional[str] +class TInsertOnlyStrategyDict(TMergeDispositionDict, total=False): + scope: Optional[TInsertOnlyScope] + + TWriteDispositionConfig = Union[ TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict, TDeleteInsertStrategyDict, + TInsertOnlyStrategyDict, ] diff --git a/dlt/destinations/impl/ducklake/ducklake.py b/dlt/destinations/impl/ducklake/ducklake.py index 437b33c13b..ec4f2cb921 100644 --- a/dlt/destinations/impl/ducklake/ducklake.py +++ b/dlt/destinations/impl/ducklake/ducklake.py @@ -87,6 +87,11 @@ def gen_upsert_merge_sql( deleted_cond: Optional[str], insert_only: bool = False, not_deleted_cond: Optional[str] = None, + scope: Optional[str] = None, + dlt_load_id_col: Optional[str] = None, + loads_table_name: Optional[str] = None, + escaped_load_id: Optional[str] = None, + escaped_status: Optional[str] = None, ) -> List[str]: """Generate MERGE statement without DELETE clause + separate DELETE for hard deletes.""" # insert-only: no DuckLake-specific workaround needed @@ -100,6 +105,11 @@ def gen_upsert_merge_sql( deleted_cond, insert_only=True, not_deleted_cond=not_deleted_cond, + scope=scope, + dlt_load_id_col=dlt_load_id_col, + loads_table_name=loads_table_name, + escaped_load_id=escaped_load_id, + escaped_status=escaped_status, ) # upsert: get MERGE without DELETE clause, then add separate DELETE for hard deletes diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 493958b5cd..02d36c2bb7 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -47,7 +47,7 @@ from dlt.common.time import ensure_pendulum_datetime_utc from dlt.common.typing import ConfigValue, DictStrAny from dlt.common.schema import Schema, TSchemaTables -from dlt.common.schema.utils import get_columns_names_with_prop, is_nested_table +from dlt.common.schema.utils import get_columns_names_with_prop, get_root_table, is_nested_table from dlt.common.storages import FileStorage, fsspec_from_config from dlt.common.storages.load_package import ( LoadJobInfo, @@ -198,6 +198,28 @@ def arrow_dataset(self) -> Any: def _partition_columns(self) -> List[str]: return get_columns_names_with_prop(self._load_table, "partition") + def _get_previous_load_id(self) -> Optional[str]: + """Returns the most recently completed load_id from the loads table, or None.""" + loads_table_name = self._job_client.schema.loads_table_name + try: + files = self._job_client.list_table_files(loads_table_name) + except DestinationUndefinedEntity: + return None + # filename format: {schema_name}__{load_id}.jsonl — load_id is a float timestamp string + load_ids: List[str] = [] + for filepath in files: + filename = os.path.splitext(os.path.basename(filepath))[0] + parts = filename.rsplit(FILENAME_SEPARATOR, maxsplit=1) + if len(parts) == 2: + try: + float(parts[1]) + except ValueError: + continue + load_ids.append(parts[1]) + if not load_ids: + return None + return max(load_ids, key=float) + class DeltaLoadFilesystemJob(TableFormatLoadFilesystemJob): def run(self) -> None: @@ -226,6 +248,24 @@ def run(self) -> None: except DestinationUndefinedEntity: delta_table = None + previous_load_id: Optional[str] = None + root_delta_table = None + if ( + delta_table is not None + and self._load_table.get("x-insert-only-scope") == "previous_load" + ): + previous_load_id = self._get_previous_load_id() + if is_nested_table(self._load_table): + root_table_name = get_root_table( + self._job_client.schema.tables, self._load_table["name"] + )["name"] + try: + root_delta_table = self._job_client.load_open_table( + "delta", root_table_name + ) + except DestinationUndefinedEntity: + pass + with source_ds.scanner().to_reader() as arrow_rbr: # RecordBatchReader if self._load_table["write_disposition"] == "merge" and delta_table is not None: merge_delta_table( @@ -234,6 +274,8 @@ def run(self) -> None: schema=self._load_table, load_table_name=self.load_table_name, streamed_exec=self._job_client.config.deltalake_streamed_exec, + previous_load_id=previous_load_id, + root_table=root_delta_table, ) else: location = self._job_client.get_open_table_location("delta", self.load_table_name) @@ -313,11 +355,27 @@ def run(self) -> None: return if self._load_table["write_disposition"] == "merge" and table is not None: + previous_load_id: Optional[str] = None + root_iceberg_table = None + if self._load_table.get("x-insert-only-scope") == "previous_load": + previous_load_id = self._get_previous_load_id() + if is_nested_table(self._load_table): + root_table_name = get_root_table( + self._job_client.schema.tables, self._load_table["name"] + )["name"] + try: + root_iceberg_table = self._job_client.load_open_table( + "iceberg", root_table_name + ) + except DestinationUndefinedEntity: + pass merge_iceberg_table( table=table, data=self.arrow_dataset.to_table(), schema=self._load_table, load_table_name=self.load_table_name, + previous_load_id=previous_load_id, + root_table=root_iceberg_table, ) else: write_iceberg_table( @@ -861,6 +919,13 @@ def prepare_load_table(self, table_name: str) -> PreparedTableSchema: table["write_disposition"] = "append" else: table["x-merge-strategy"] = merge_strategy # type: ignore[typeddict-unknown-key] + if table["write_disposition"] == "merge" and is_nested_table(table): + # propagate insert-only scope from root table to child tables so + # the table-format load jobs apply the same scoped dedup behavior + root = get_root_table(self.schema.tables, table["name"]) + scope = root.get("x-insert-only-scope") + if scope: + table["x-insert-only-scope"] = scope # type: ignore[typeddict-unknown-key] if table["write_disposition"] == "replace": replace_strategy = resolve_replace_strategy( table, self.config.replace_strategy, self.capabilities diff --git a/dlt/destinations/impl/lancedb/jobs.py b/dlt/destinations/impl/lancedb/jobs.py index bbe3535843..cdf0948b09 100644 --- a/dlt/destinations/impl/lancedb/jobs.py +++ b/dlt/destinations/impl/lancedb/jobs.py @@ -7,6 +7,7 @@ RunnableLoadJob, HasFollowupJobs, ) +from dlt.common.destination.exceptions import DestinationTerminalException from dlt.common.destination.utils import resolve_merge_strategy from dlt.common.schema.typing import ( TWriteDisposition, @@ -59,6 +60,12 @@ def run(self) -> None: merge_strategy = resolve_merge_strategy( {self._load_table["name"]: self._load_table}, self._load_table ) + if self._load_table.get("x-insert-only-scope"): + raise DestinationTerminalException( + "LanceDB does not support insert-only with" + " scope='previous_load'. Remove the scope parameter" + " or use a different destination." + ) with open(self._file_path, mode="rb") as f: arrow_table: pa.Table = pq.read_table(f) diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 82083c2d13..e59e06dfc8 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -7,6 +7,8 @@ from dlt.common.typing import TAnyDateTime, TypedDict from dlt.common.schema.typing import ( + C_DLT_LOAD_ID, + LOADS_TABLE_NAME, TSortOrder, TColumnProp, ) @@ -749,6 +751,11 @@ def gen_upsert_merge_sql( deleted_cond: Optional[str], insert_only: bool = False, not_deleted_cond: Optional[str] = None, + scope: Optional[str] = None, + dlt_load_id_col: Optional[str] = None, + loads_table_name: Optional[str] = None, + escaped_load_id: Optional[str] = None, + escaped_status: Optional[str] = None, ) -> List[str]: """Generate MERGE statement for upsert/insert-only on root table. @@ -760,17 +767,42 @@ def gen_upsert_merge_sql( col_str = ", ".join(["{alias}" + c for c in root_table_column_names]) if insert_only: - staging_source = staging_root_table_name - if hard_delete_col is not None and not_deleted_cond is not None: - staging_source = ( - f"(SELECT * FROM {staging_root_table_name} WHERE {not_deleted_cond})" + if scope == "previous_load" and dlt_load_id_col is not None and loads_table_name: + # pre-filter hard-deleted records from staging before scope filter + if hard_delete_col is not None and not_deleted_cond is not None: + sql.append( + f"DELETE FROM {staging_root_table_name} WHERE NOT ({not_deleted_cond});" + ) + # remove from staging keys already present in the previous load + pk_join = " AND ".join([f"s.{c} = prev.{c}" for c in primary_keys]) + sql.append( + f"DELETE FROM {staging_root_table_name} s" + " WHERE EXISTS (" + f"SELECT 1 FROM {root_table_name} prev" + f" WHERE {pk_join}" + f" AND prev.{dlt_load_id_col} = (" + f"SELECT MAX({escaped_load_id}) FROM {loads_table_name}" + f" WHERE {escaped_status} = 0));" ) - sql.append(f""" - MERGE INTO {root_table_name} d USING {staging_source} s - ON {on_str} - WHEN NOT MATCHED - THEN INSERT ({col_str.format(alias="")}) VALUES ({col_str.format(alias="s.")}); - """) + # merge remaining staging rows — all are NOT MATCHED by construction + sql.append(f""" + MERGE INTO {root_table_name} d USING {staging_root_table_name} s + ON 1 = 0 + WHEN NOT MATCHED + THEN INSERT ({col_str.format(alias="")}) VALUES ({col_str.format(alias="s.")}); + """) + else: + staging_source = staging_root_table_name + if hard_delete_col is not None and not_deleted_cond is not None: + staging_source = ( + f"(SELECT * FROM {staging_root_table_name} WHERE {not_deleted_cond})" + ) + sql.append(f""" + MERGE INTO {root_table_name} d USING {staging_source} s + ON {on_str} + WHEN NOT MATCHED + THEN INSERT ({col_str.format(alias="")}) VALUES ({col_str.format(alias="s.")}); + """) else: update_str = ", ".join([c + " = " + "s." + c for c in root_table_column_names]) delete_str = ( @@ -815,6 +847,13 @@ def gen_upsert_sql( escape_lit, ) + # read insert-only scope hint + scope: Optional[str] = root_table.get("x-insert-only-scope") if insert_only else None # type: ignore[assignment] + dlt_load_id_col = escape_column_id(C_DLT_LOAD_ID) if scope else None + loads_table_name = sql_client.make_qualified_table_name(LOADS_TABLE_NAME) if scope else None + escaped_load_id = escape_column_id("load_id") if scope else None + escaped_status = escape_column_id("status") if scope else None + # generate merge statement for root table root_table_column_names = list(map(escape_column_id, root_table["columns"])) # we need not_deleted_cond to filter out hard deleted rows before insert @@ -833,6 +872,11 @@ def gen_upsert_sql( deleted_cond, insert_only=insert_only, not_deleted_cond=not_deleted_cond, + scope=scope, + dlt_load_id_col=dlt_load_id_col, + loads_table_name=loads_table_name, + escaped_load_id=escaped_load_id, + escaped_status=escaped_status, ) ) @@ -865,13 +909,41 @@ def gen_upsert_sql( update_str = f"WHEN MATCHED THEN UPDATE SET {update_str}" col_str = ", ".join(["{alias}" + c for c in table_column_names]) - sql.append(f""" - MERGE INTO {table_name} d USING {staging_table_name} s - ON d.{nested_row_key_column} = s.{nested_row_key_column} - {update_str} - WHEN NOT MATCHED - THEN INSERT ({col_str.format(alias="")}) VALUES ({col_str.format(alias="s.")}); - """) + if scope == "previous_load" and dlt_load_id_col is not None and loads_table_name: + # nested tables lack _dlt_load_id; join through root table + root_key_column = escape_column_id( + cls.get_root_key_col( + table_chain, + table, + sql_client.fully_qualified_dataset_name(), + sql_client.fully_qualified_dataset_name(staging=True), + ) + ) + sql.append( + f"DELETE FROM {staging_table_name} s" + " WHERE EXISTS (" + f"SELECT 1 FROM {table_name} n" + f" INNER JOIN {root_table_name} r" + f" ON n.{root_key_column} = r.{root_row_key_column}" + f" WHERE n.{nested_row_key_column} = s.{nested_row_key_column}" + f" AND r.{dlt_load_id_col} = (" + f"SELECT MAX({escaped_load_id}) FROM {loads_table_name}" + f" WHERE {escaped_status} = 0));" + ) + sql.append(f""" + MERGE INTO {table_name} d USING {staging_table_name} s + ON 1 = 0 + WHEN NOT MATCHED + THEN INSERT ({col_str.format(alias="")}) VALUES ({col_str.format(alias="s.")}); + """) + else: + sql.append(f""" + MERGE INTO {table_name} d USING {staging_table_name} s + ON d.{nested_row_key_column} = s.{nested_row_key_column} + {update_str} + WHEN NOT MATCHED + THEN INSERT ({col_str.format(alias="")}) VALUES ({col_str.format(alias="s.")}); + """) if not insert_only: root_key_column = escape_column_id( diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index efff31d3e1..11f467cfae 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -22,10 +22,12 @@ TWriteDispositionConfig, TMergeDispositionDict, TScd2StrategyDict, + TInsertOnlyStrategyDict, TAnySchemaColumns, TTableFormat, TSchemaContract, DEFAULT_VALIDITY_COLUMN_NAMES, + INSERT_ONLY_SCOPES, MERGE_STRATEGIES, TTableReferenceParam, ) @@ -703,6 +705,10 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: "unique": False, "row_key": False, } + elif merge_strategy == "insert-only": + md_dict = cast(TInsertOnlyStrategyDict, md_dict) + if scope := md_dict.get("scope"): + dict_["x-insert-only-scope"] = scope @staticmethod def _merge_incremental_column_hint(dict_: Dict[str, Any]) -> None: @@ -787,6 +793,14 @@ def validate_write_disposition_hint(template: TResourceHints) -> None: except Exception: raise ValueError(f"could not parse `{ts}` value `{wd[ts]}`") + if wd.get("strategy") == "insert-only": + wd = cast(TInsertOnlyStrategyDict, wd) + if "scope" in wd and wd["scope"] is not None: + if wd["scope"] not in INSERT_ONLY_SCOPES: + raise ValueErrorWithKnownValues( + "write_disposition['scope']", wd["scope"], INSERT_ONLY_SCOPES + ) + @staticmethod def validate_reference_hint(template: TResourceHints) -> None: ref = template.get("reference") diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 51c5ba459c..9dbdedf6b6 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -14,7 +14,11 @@ from dlt.common.configuration.plugins import PluginContext from dlt.common.configuration.resolve import inject_section from dlt.common.configuration.specs.config_section_context import ConfigSectionContext -from dlt.common.exceptions import ArgumentsOverloadException, DictValidationException +from dlt.common.exceptions import ( + ArgumentsOverloadException, + DictValidationException, + ValueErrorWithKnownValues, +) from dlt.common.pipeline import StateInjectableContext, TPipelineState from dlt.common.schema import Schema from dlt.common.schema.utils import new_table, new_column @@ -1148,6 +1152,41 @@ def invalid_disposition(): assert "write_disposition" in str(py_ex.value) +def test_resource_sets_invalid_insert_only_scope() -> None: + with pytest.raises(ValueErrorWithKnownValues) as py_ex: + + @dlt.resource( + merge_key="id", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "prev_load", + }, + ) + def invalid_scope(): + yield from [{"id": 1}] + + assert "scope" in str(py_ex.value) + assert "prev_load" in str(py_ex.value) + + +def test_resource_sets_valid_insert_only_scope() -> None: + @dlt.resource( + merge_key="id", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + ) + def valid_scope(): + yield from [{"id": 1}] + + r = valid_scope() + schema = r.compute_table_schema() + assert schema["x-insert-only-scope"] == "previous_load" + + def test_custom_source_impl() -> None: class TypedSource(DltSource): def users(self, mode: str) -> DltResource: diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index ca73046a11..bb5918876c 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -2135,3 +2135,348 @@ def parent_items_update(): # Child1 exists so not re-inserted, Child2 unchanged, # Child3 and Child4 are new inserts assert len(parent_tables["parent_items__children"]) == 4 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + table_format_local_configs=True, + supports_merge=True, + ), + ids=lambda x: x.name, +) +def test_insert_only_scope_previous_load( + destination_config: DestinationTestConfiguration, +) -> None: + """Key absent from previous load is re-inserted; key present in previous load is skipped.""" + skip_if_unsupported_merge_strategy(destination_config, "insert-only") + + p = destination_config.setup_pipeline("insert_only_scope", dev_mode=True) + + @dlt.resource( + primary_key="row_hash", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + table_format=destination_config.table_format, + ) + def items(): + yield [ + {"row_hash": "aaa", "value": 1}, + {"row_hash": "bbb", "value": 2}, + ] + + # load 1: both inserted + info = p.run(items(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "items", exclude_system_cols=True) + assert_records_as_set( + tables["items"], + [ + {"row_hash": "aaa", "value": 1}, + {"row_hash": "bbb", "value": 2}, + ], + ) + + # load 2: only bbb — aaa absent from this load + @dlt.resource( + name="items", + primary_key="row_hash", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + table_format=destination_config.table_format, + ) + def items_load2(): + yield [{"row_hash": "bbb", "value": 2}] + + info = p.run(items_load2(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "items", exclude_system_cols=True) + # bbb skipped (in previous load), no new rows + assert_records_as_set( + tables["items"], + [ + {"row_hash": "aaa", "value": 1}, + {"row_hash": "bbb", "value": 2}, + ], + ) + + # load 3: aaa returns — was absent from previous load, should be re-inserted + @dlt.resource( + name="items", + primary_key="row_hash", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + table_format=destination_config.table_format, + ) + def items_load3(): + yield [ + {"row_hash": "aaa", "value": 1}, + {"row_hash": "bbb", "value": 2}, + ] + + info = p.run(items_load3(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "items", exclude_system_cols=True) + # aaa re-inserted (not in previous load), bbb skipped (was in previous load) + assert len([r for r in tables["items"] if r["row_hash"] == "aaa"]) == 2 + assert len([r for r in tables["items"] if r["row_hash"] == "bbb"]) == 2 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + table_format_local_configs=True, + supports_merge=True, + ), + ids=lambda x: x.name, +) +def test_insert_only_default_scope_unchanged( + destination_config: DestinationTestConfiguration, +) -> None: + """Without scope param, insert-only checks against all history (existing behavior).""" + skip_if_unsupported_merge_strategy(destination_config, "insert-only") + + p = destination_config.setup_pipeline("insert_only_default_scope", dev_mode=True) + + @dlt.resource( + primary_key="row_hash", + write_disposition={"disposition": "merge", "strategy": "insert-only"}, + table_format=destination_config.table_format, + ) + def items(): + yield [{"row_hash": "aaa", "value": 1}] + + # load 1 + info = p.run(items(), **destination_config.run_kwargs) + assert_load_info(info) + + # load 2: different data + @dlt.resource( + name="items", + primary_key="row_hash", + write_disposition={"disposition": "merge", "strategy": "insert-only"}, + table_format=destination_config.table_format, + ) + def items_load2(): + yield [{"row_hash": "bbb", "value": 2}] + + info = p.run(items_load2(), **destination_config.run_kwargs) + assert_load_info(info) + + # load 3: aaa returns — without scope, still skipped (in all-history) + @dlt.resource( + name="items", + primary_key="row_hash", + write_disposition={"disposition": "merge", "strategy": "insert-only"}, + table_format=destination_config.table_format, + ) + def items_load3(): + yield [{"row_hash": "aaa", "value": 1}] + + info = p.run(items_load3(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "items", exclude_system_cols=True) + # aaa NOT re-inserted — default scope checks all history + assert len([r for r in tables["items"] if r["row_hash"] == "aaa"]) == 1 + assert len([r for r in tables["items"] if r["row_hash"] == "bbb"]) == 1 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + table_format_local_configs=True, + supports_merge=True, + ), + ids=lambda x: x.name, +) +def test_insert_only_scope_empty_target( + destination_config: DestinationTestConfiguration, +) -> None: + """First load into empty table with scope=previous_load inserts all records.""" + skip_if_unsupported_merge_strategy(destination_config, "insert-only") + + p = destination_config.setup_pipeline("insert_only_scope_empty", dev_mode=True) + + @dlt.resource( + primary_key="row_hash", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + table_format=destination_config.table_format, + ) + def items(): + yield [ + {"row_hash": "aaa", "value": 1}, + {"row_hash": "bbb", "value": 2}, + ] + + info = p.run(items(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "items", exclude_system_cols=True) + assert_records_as_set( + tables["items"], + [ + {"row_hash": "aaa", "value": 1}, + {"row_hash": "bbb", "value": 2}, + ], + ) + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + table_format_local_configs=True, + supports_merge=True, + ), + ids=lambda x: x.name, +) +def test_insert_only_scope_with_nested_tables( + destination_config: DestinationTestConfiguration, +) -> None: + """Nested table rows absent from previous load are re-inserted with scope=previous_load.""" + skip_if_unsupported_merge_strategy(destination_config, "insert-only") + + p = destination_config.setup_pipeline("insert_only_scope_nested", dev_mode=True) + + @dlt.resource( + primary_key="id", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + table_format=destination_config.table_format, + ) + def parent(): + yield [ + {"id": 1, "name": "P1", "children": [{"child_id": 1, "val": "C1"}]}, + {"id": 2, "name": "P2", "children": [{"child_id": 2, "val": "C2"}]}, + ] + + # load 1: both parents + children + info = p.run(parent(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "parent", "parent__children", exclude_system_cols=True) + assert len(tables["parent"]) == 2 + assert len(tables["parent__children"]) == 2 + + # load 2: only parent 2 (parent 1 absent) + @dlt.resource( + name="parent", + primary_key="id", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + table_format=destination_config.table_format, + ) + def parent_load2(): + yield [{"id": 2, "name": "P2", "children": [{"child_id": 2, "val": "C2"}]}] + + info = p.run(parent_load2(), **destination_config.run_kwargs) + assert_load_info(info) + + # load 3: parent 1 returns + @dlt.resource( + name="parent", + primary_key="id", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + table_format=destination_config.table_format, + ) + def parent_load3(): + yield [ + {"id": 1, "name": "P1", "children": [{"child_id": 1, "val": "C1"}]}, + {"id": 2, "name": "P2", "children": [{"child_id": 2, "val": "C2"}]}, + ] + + info = p.run(parent_load3(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "parent", "parent__children", exclude_system_cols=True) + # parent 1 re-inserted (absent from previous load) + assert len([r for r in tables["parent"] if r["id"] == 1]) == 2 + # child rows also re-inserted + assert len([r for r in tables["parent__children"] if r["child_id"] == 1]) == 2 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + supports_merge=True, + ), + ids=lambda x: x.name, +) +def test_insert_only_scope_with_hard_delete( + destination_config: DestinationTestConfiguration, +) -> None: + """Hard-deleted records are filtered from staging before scope comparison.""" + skip_if_unsupported_merge_strategy(destination_config, "insert-only") + + p = destination_config.setup_pipeline("insert_only_scope_hard_delete", dev_mode=True) + + @dlt.resource( + primary_key="id", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + columns={"deleted": {"hard_delete": True}}, + table_format=destination_config.table_format, + ) + def items(): + yield [ + {"id": 1, "name": "Alice", "deleted": False}, + {"id": 2, "name": "Bob", "deleted": False}, + ] + + # load 1 + info = p.run(items(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "items", exclude_system_cols=True) + assert len(tables["items"]) == 2 + + # load 2: new record marked as deleted + existing record + @dlt.resource( + name="items", + primary_key="id", + write_disposition={ + "disposition": "merge", + "strategy": "insert-only", + "scope": "previous_load", + }, + columns={"deleted": {"hard_delete": True}}, + table_format=destination_config.table_format, + ) + def items_load2(): + yield [ + {"id": 3, "name": "Charlie", "deleted": True}, + {"id": 4, "name": "Dave", "deleted": False}, + ] + + info = p.run(items_load2(), **destination_config.run_kwargs) + assert_load_info(info) + tables = load_tables_to_dicts(p, "items", exclude_system_cols=True) + # Charlie filtered out (hard delete), Dave inserted + assert len([r for r in tables["items"] if r["name"] == "Charlie"]) == 0 + assert len([r for r in tables["items"] if r["name"] == "Dave"]) == 1