From 3df5c98d0619038edab8872b4c48ebc6c7289bd6 Mon Sep 17 00:00:00 2001 From: ugbotueferhire Date: Sat, 11 Apr 2026 08:24:26 +0100 Subject: [PATCH 1/2] fix: defer state publish in RunnableLoadJob until _on_completed marker is persisted (#3849) --- dlt/common/destination/client.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/dlt/common/destination/client.py b/dlt/common/destination/client.py index 5123476f1e..a9a73964b8 100644 --- a/dlt/common/destination/client.py +++ b/dlt/common/destination/client.py @@ -446,25 +446,31 @@ def run_managed( self._done_event = done_event # filepath is now moved to running + # Use a local variable to track the terminal state. We must NOT publish + # to self._state until _on_completed() has finished persisting the + # pending-transition marker to disk. Otherwise the main thread can see + # "completed" via job.state(), finalize the package (rename the directory), + # and race against the worker still writing inside it. See #3849. + terminal_state: TLoadJobState = "running" try: self._state = "running" self._job_client.prepare_load_job_execution(self) self.run() - self._state = "completed" + terminal_state = "completed" except (TerminalException, AssertionError) as e: - self._state = "failed" + terminal_state = "failed" self._exception = e logger.exception(f"Terminal exception in job {self.job_id()} in file {self._file_path}") except (DestinationTransientException, Exception) as e: - self._state = "retry" + terminal_state = "retry" self._exception = e logger.exception( f"Transient exception in job {self.job_id()} in file {self._file_path}" ) finally: # sanity check - assert self._state in ("completed", "retry", "failed") - if self._state != "retry": + assert terminal_state in ("completed", "retry", "failed") + if terminal_state != "retry": # persist terminal state so resume can skip re-execution if self._on_completed: if self._exception: @@ -477,12 +483,15 @@ def run_managed( ) else: failed_message = None - self._on_completed(self._state, failed_message) + self._on_completed(terminal_state, failed_message) self._finished_at = pendulum.now() - # wake up waiting threads - if self._done_event: - with contextlib.suppress(ValueError): - self._done_event.release() + # Publish state only AFTER the marker is persisted and timestamp set. + # This is the moment the main thread is allowed to see the terminal state. + self._state = terminal_state + # wake up waiting threads for terminal (non-retry) states + if terminal_state != "retry" and self._done_event: + with contextlib.suppress(ValueError): + self._done_event.release() @abstractmethod def run(self) -> None: From 7b05296afc16692eebe7adddc0f553774e581f82 Mon Sep 17 00:00:00 2001 From: ugbotueferhire Date: Sat, 11 Apr 2026 10:43:36 +0100 Subject: [PATCH 2/2] fix(cli): always register profile and workspace commands (#3788) When workspace context is not active, dlt profile and dlt workspace were completely absent from the CLI because the plugin hooks returned None. This adds placeholder command classes that print actionable setup instructions instead of silently vanishing. --- dlt/_workspace/cli/_plugins.py | 8 +++++-- dlt/_workspace/cli/commands.py | 41 ++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/dlt/_workspace/cli/_plugins.py b/dlt/_workspace/cli/_plugins.py index 2e84fa4952..d3786904ef 100644 --- a/dlt/_workspace/cli/_plugins.py +++ b/dlt/_workspace/cli/_plugins.py @@ -89,7 +89,9 @@ def plug_cli_profile() -> Type[plugins.SupportsCliCommand]: return ProfileCommand else: - return None + from dlt._workspace.cli.commands import _ProfilePlaceholder + + return _ProfilePlaceholder @plugins.hookimpl(specname="plug_cli") @@ -99,4 +101,6 @@ def plug_cli_workspace() -> Type[plugins.SupportsCliCommand]: return WorkspaceCommand else: - return None + from dlt._workspace.cli.commands import _WorkspacePlaceholder + + return _WorkspacePlaceholder diff --git a/dlt/_workspace/cli/commands.py b/dlt/_workspace/cli/commands.py index b8b7652397..0a79deac6c 100644 --- a/dlt/_workspace/cli/commands.py +++ b/dlt/_workspace/cli/commands.py @@ -1114,3 +1114,44 @@ def execute(self, args: argparse.Namespace) -> None: pin_profile(workspace_context, args.profile_name) else: self.parser.print_usage() + + +class _ProfilePlaceholder(SupportsCliCommand): + """Registered when workspace context is not active so `dlt profile` is always discoverable.""" + + command = "profile" + help_string = "Manage Workspace built-in profiles (requires .dlt/.workspace)" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + self.parser = parser + # Accept but ignore any subcommands/args so the command never errors on parse + parser.add_argument("remainder", nargs="*", help=argparse.SUPPRESS) + + def execute(self, args: argparse.Namespace) -> None: + fmt.warning( + "Profiles require an initialized workspace.\n" + "Create the workspace marker to enable profile support:\n\n" + " mkdir -p .dlt && touch .dlt/.workspace\n\n" + "Then run 'dlt profile' again." + ) + raise CliCommandException(error_code=-1) + + +class _WorkspacePlaceholder(SupportsCliCommand): + """Registered when workspace context is not active so `dlt workspace` is always discoverable.""" + + command = "workspace" + help_string = "Manage current Workspace (requires .dlt/.workspace)" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + self.parser = parser + parser.add_argument("remainder", nargs="*", help=argparse.SUPPRESS) + + def execute(self, args: argparse.Namespace) -> None: + fmt.warning( + "Workspace commands require an initialized workspace.\n" + "Create the workspace marker to enable workspace support:\n\n" + " mkdir -p .dlt && touch .dlt/.workspace\n\n" + "Then run 'dlt workspace' again." + ) + raise CliCommandException(error_code=-1)