diff --git a/dlt/common/destination/client.py b/dlt/common/destination/client.py index dda81c0711..2f8c455421 100644 --- a/dlt/common/destination/client.py +++ b/dlt/common/destination/client.py @@ -448,8 +448,10 @@ def run_managed( # filepath is now moved to running try: self._state = "running" - self._job_client.prepare_load_job_execution(self) - self.run() + # open client connection only when running + with self._job_client: + self._job_client.prepare_load_job_execution(self) + self.run() self._state = "completed" except (TerminalException, AssertionError) as e: self._state = "failed" @@ -462,27 +464,31 @@ def run_managed( f"Transient exception in job {self.job_id()} in file {self._file_path}" ) finally: - # sanity check - assert self._state in ("completed", "retry", "failed") - if self._state != "retry": - # persist terminal state so resume can skip re-execution - if self._on_completed: - if self._exception: - failed_message = "".join( - traceback.format_exception( - type(self._exception), - self._exception, - self._exception.__traceback__, - ) + self.release() + + def release(self) -> None: + """Release job from polling""" + # sanity check + assert self._state in ("completed", "retry", "failed") + if self._state != "retry": + # persist terminal state so resume can skip re-execution + if self._on_completed: + if self._exception: + failed_message = "".join( + traceback.format_exception( + type(self._exception), + self._exception, + self._exception.__traceback__, ) - else: - failed_message = None - self._on_completed(self._state, failed_message) - self._finished_at = pendulum.now() - # wake up waiting threads - if self._done_event: - with contextlib.suppress(ValueError): - self._done_event.release() + ) + else: + failed_message = None + self._on_completed(self._state, failed_message) + self._finished_at = pendulum.now() + # wake up waiting threads + if self._done_event: + with contextlib.suppress(ValueError): + self._done_event.release() @abstractmethod def run(self) -> None: diff --git a/dlt/dataset/relation.py b/dlt/dataset/relation.py index 13988f8644..7241aa40b1 100644 --- a/dlt/dataset/relation.py +++ b/dlt/dataset/relation.py @@ -613,7 +613,9 @@ def with_load_id_col(self) -> dlt.Relation: self._dataset.schema.tables, self._table_name )["name"] if root_table_name == self._table_name: - raise ValueError(f"{root_table_name} is a root table, but load id column is not present.") + raise ValueError( + f"{root_table_name} is a root table, but load id column is not present." + ) join_alias = "_dlt_root" joined = self.join(root_table_name, alias=join_alias) diff --git a/dlt/destinations/impl/ducklake/sql_client.py b/dlt/destinations/impl/ducklake/sql_client.py index de811d3206..ca55805742 100644 --- a/dlt/destinations/impl/ducklake/sql_client.py +++ b/dlt/destinations/impl/ducklake/sql_client.py @@ -145,10 +145,13 @@ def build_attach_statement( ) attach_statement = f"ATTACH IF NOT EXISTS 'ducklake:md:{catalog.database}'" attach_params = f", METADATA_SCHEMA '{metadata_schema}'" - elif catalog.drivername in ("sqlite", "duckdb"): - # attach sqllite with multi-process access + elif catalog.drivername == "sqlite": + # attach sqlite with multi-process access attach_statement = f"ATTACH IF NOT EXISTS 'ducklake:{catalog.database}'" attach_params = ", META_TYPE 'sqlite', META_JOURNAL_MODE 'WAL', META_BUSY_TIMEOUT 1000" + elif catalog.drivername == "duckdb": + # DuckDB-backed catalog: no META_TYPE, DuckLake opens it natively + attach_statement = f"ATTACH IF NOT EXISTS 'ducklake:{catalog.database}'" else: raise NotImplementedError(str(catalog)) attach_statement += f" AS {ducklake_name}" diff --git a/dlt/load/load.py b/dlt/load/load.py index eed7a80136..b5e1b0954f 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -283,9 +283,15 @@ def w_run_job( if use_staging_client else self.get_destination_client(schema) ) - with active_job_client as client: - with self.maybe_with_staging_dataset(client, use_staging_dataset): + try: + with self.maybe_with_staging_dataset(active_job_client, use_staging_dataset): job.run_managed(active_job_client, self._done_event) + except Exception as e: + # worker died in uncontrollable manner + logger.exception() + job._state = "retry" + job._exception = e + job.release() def start_new_jobs( self, load_id: str, schema: Schema, running_jobs: Sequence[LoadJob] @@ -301,6 +307,7 @@ def start_new_jobs( available_slots = get_available_worker_slots(self.config, caps, running_jobs) if available_slots <= 0: return [] + logger.debug(f"Free worker slots: {available_slots}") # get a list of jobs eligible to be started load_files = filter_new_jobs( diff --git a/tests/load/ducklake/test_ducklake_pipeline.py b/tests/load/ducklake/test_ducklake_pipeline.py index b2613f978b..94d991f8fb 100644 --- a/tests/load/ducklake/test_ducklake_pipeline.py +++ b/tests/load/ducklake/test_ducklake_pipeline.py @@ -78,8 +78,17 @@ def test_all_catalogs(catalog: str) -> None: # test catalog location if applicable catalog_location = pipeline.destination_client().config.credentials.catalog.database # type: ignore if "." in catalog_location: - # it is a file - assert pathlib.Path(get_test_storage_root(), catalog_location).exists() + catalog_file = pathlib.Path(get_test_storage_root(), catalog_location) + assert catalog_file.exists() + # verify the catalog file has the expected on-disk backend format. regression for #3870 + # where a `duckdb:///` URI silently produced a SQLite-format file at `catalog.duckdb`. + header = catalog_file.read_bytes()[:16] + if catalog_location.endswith(".duckdb"): + assert b"DUCK" in header, f"expected DuckDB-format catalog, got: {header!r}" + elif catalog_location.endswith(".sqlite"): + assert header.startswith( + b"SQLite format" + ), f"expected SQLite-format catalog, got: {header!r}" @pytest.mark.parametrize(