Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,10 @@ def run_managed(
# filepath is now moved to running
try:
self._state = "running"
self._job_client.prepare_load_job_execution(self)
self.run()
# open client connection only when running
with self._job_client:
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
except (TerminalException, AssertionError) as e:
self._state = "failed"
Expand All @@ -462,27 +464,31 @@ def run_managed(
f"Transient exception in job {self.job_id()} in file {self._file_path}"
)
finally:
# sanity check
assert self._state in ("completed", "retry", "failed")
if self._state != "retry":
# persist terminal state so resume can skip re-execution
if self._on_completed:
if self._exception:
failed_message = "".join(
traceback.format_exception(
type(self._exception),
self._exception,
self._exception.__traceback__,
)
self.release()

def release(self) -> None:
"""Release job from polling"""
# sanity check
assert self._state in ("completed", "retry", "failed")
if self._state != "retry":
# persist terminal state so resume can skip re-execution
if self._on_completed:
if self._exception:
failed_message = "".join(
traceback.format_exception(
type(self._exception),
self._exception,
self._exception.__traceback__,
)
else:
failed_message = None
self._on_completed(self._state, failed_message)
self._finished_at = pendulum.now()
# wake up waiting threads
if self._done_event:
with contextlib.suppress(ValueError):
self._done_event.release()
)
else:
failed_message = None
self._on_completed(self._state, failed_message)
self._finished_at = pendulum.now()
# wake up waiting threads
if self._done_event:
with contextlib.suppress(ValueError):
self._done_event.release()

@abstractmethod
def run(self) -> None:
Expand Down
4 changes: 3 additions & 1 deletion dlt/dataset/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,9 @@ def with_load_id_col(self) -> dlt.Relation:
self._dataset.schema.tables, self._table_name
)["name"]
if root_table_name == self._table_name:
raise ValueError(f"{root_table_name} is a root table, but load id column is not present.")
raise ValueError(
f"{root_table_name} is a root table, but load id column is not present."
)

join_alias = "_dlt_root"
joined = self.join(root_table_name, alias=join_alias)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/ducklake/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,13 @@ def build_attach_statement(
)
attach_statement = f"ATTACH IF NOT EXISTS 'ducklake:md:{catalog.database}'"
attach_params = f", METADATA_SCHEMA '{metadata_schema}'"
elif catalog.drivername in ("sqlite", "duckdb"):
# attach sqllite with multi-process access
elif catalog.drivername == "sqlite":
# attach sqlite with multi-process access
attach_statement = f"ATTACH IF NOT EXISTS 'ducklake:{catalog.database}'"
attach_params = ", META_TYPE 'sqlite', META_JOURNAL_MODE 'WAL', META_BUSY_TIMEOUT 1000"
elif catalog.drivername == "duckdb":
# DuckDB-backed catalog: no META_TYPE, DuckLake opens it natively
attach_statement = f"ATTACH IF NOT EXISTS 'ducklake:{catalog.database}'"
else:
raise NotImplementedError(str(catalog))
attach_statement += f" AS {ducklake_name}"
Expand Down
11 changes: 9 additions & 2 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,15 @@ def w_run_job(
if use_staging_client
else self.get_destination_client(schema)
)
with active_job_client as client:
with self.maybe_with_staging_dataset(client, use_staging_dataset):
try:
with self.maybe_with_staging_dataset(active_job_client, use_staging_dataset):
job.run_managed(active_job_client, self._done_event)
except Exception as e:
# worker died in uncontrollable manner
logger.exception()
job._state = "retry"
job._exception = e
job.release()

def start_new_jobs(
self, load_id: str, schema: Schema, running_jobs: Sequence[LoadJob]
Expand All @@ -301,6 +307,7 @@ def start_new_jobs(
available_slots = get_available_worker_slots(self.config, caps, running_jobs)
if available_slots <= 0:
return []
logger.debug(f"Free worker slots: {available_slots}")

# get a list of jobs eligible to be started
load_files = filter_new_jobs(
Expand Down
13 changes: 11 additions & 2 deletions tests/load/ducklake/test_ducklake_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,17 @@ def test_all_catalogs(catalog: str) -> None:
# test catalog location if applicable
catalog_location = pipeline.destination_client().config.credentials.catalog.database # type: ignore
if "." in catalog_location:
# it is a file
assert pathlib.Path(get_test_storage_root(), catalog_location).exists()
catalog_file = pathlib.Path(get_test_storage_root(), catalog_location)
assert catalog_file.exists()
# verify the catalog file has the expected on-disk backend format. regression for #3870
# where a `duckdb:///` URI silently produced a SQLite-format file at `catalog.duckdb`.
header = catalog_file.read_bytes()[:16]
if catalog_location.endswith(".duckdb"):
assert b"DUCK" in header, f"expected DuckDB-format catalog, got: {header!r}"
elif catalog_location.endswith(".sqlite"):
assert header.startswith(
b"SQLite format"
), f"expected SQLite-format catalog, got: {header!r}"


@pytest.mark.parametrize(
Expand Down
Loading