diff --git a/dlt/_workspace/cli/_plugins.py b/dlt/_workspace/cli/_plugins.py index 2e84fa4952..d3786904ef 100644 --- a/dlt/_workspace/cli/_plugins.py +++ b/dlt/_workspace/cli/_plugins.py @@ -89,7 +89,9 @@ def plug_cli_profile() -> Type[plugins.SupportsCliCommand]: return ProfileCommand else: - return None + from dlt._workspace.cli.commands import _ProfilePlaceholder + + return _ProfilePlaceholder @plugins.hookimpl(specname="plug_cli") @@ -99,4 +101,6 @@ def plug_cli_workspace() -> Type[plugins.SupportsCliCommand]: return WorkspaceCommand else: - return None + from dlt._workspace.cli.commands import _WorkspacePlaceholder + + return _WorkspacePlaceholder diff --git a/dlt/_workspace/cli/commands.py b/dlt/_workspace/cli/commands.py index b8b7652397..0a79deac6c 100644 --- a/dlt/_workspace/cli/commands.py +++ b/dlt/_workspace/cli/commands.py @@ -1114,3 +1114,44 @@ def execute(self, args: argparse.Namespace) -> None: pin_profile(workspace_context, args.profile_name) else: self.parser.print_usage() + + +class _ProfilePlaceholder(SupportsCliCommand): + """Registered when workspace context is not active so `dlt profile` is always discoverable.""" + + command = "profile" + help_string = "Manage Workspace built-in profiles (requires .dlt/.workspace)" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + self.parser = parser + # Accept but ignore any subcommands/args so the command never errors on parse + parser.add_argument("remainder", nargs="*", help=argparse.SUPPRESS) + + def execute(self, args: argparse.Namespace) -> None: + fmt.warning( + "Profiles require an initialized workspace.\n" + "Create the workspace marker to enable profile support:\n\n" + " mkdir -p .dlt && touch .dlt/.workspace\n\n" + "Then run 'dlt profile' again." + ) + raise CliCommandException(error_code=-1) + + +class _WorkspacePlaceholder(SupportsCliCommand): + """Registered when workspace context is not active so `dlt workspace` is always discoverable.""" + + command = "workspace" + help_string = "Manage current Workspace (requires .dlt/.workspace)" + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + self.parser = parser + parser.add_argument("remainder", nargs="*", help=argparse.SUPPRESS) + + def execute(self, args: argparse.Namespace) -> None: + fmt.warning( + "Workspace commands require an initialized workspace.\n" + "Create the workspace marker to enable workspace support:\n\n" + " mkdir -p .dlt && touch .dlt/.workspace\n\n" + "Then run 'dlt workspace' again." + ) + raise CliCommandException(error_code=-1) diff --git a/dlt/common/destination/client.py b/dlt/common/destination/client.py index 5123476f1e..a9a73964b8 100644 --- a/dlt/common/destination/client.py +++ b/dlt/common/destination/client.py @@ -446,25 +446,31 @@ def run_managed( self._done_event = done_event # filepath is now moved to running + # Use a local variable to track the terminal state. We must NOT publish + # to self._state until _on_completed() has finished persisting the + # pending-transition marker to disk. Otherwise the main thread can see + # "completed" via job.state(), finalize the package (rename the directory), + # and race against the worker still writing inside it. See #3849. + terminal_state: TLoadJobState = "running" try: self._state = "running" self._job_client.prepare_load_job_execution(self) self.run() - self._state = "completed" + terminal_state = "completed" except (TerminalException, AssertionError) as e: - self._state = "failed" + terminal_state = "failed" self._exception = e logger.exception(f"Terminal exception in job {self.job_id()} in file {self._file_path}") except (DestinationTransientException, Exception) as e: - self._state = "retry" + terminal_state = "retry" self._exception = e logger.exception( f"Transient exception in job {self.job_id()} in file {self._file_path}" ) finally: # sanity check - assert self._state in ("completed", "retry", "failed") - if self._state != "retry": + assert terminal_state in ("completed", "retry", "failed") + if terminal_state != "retry": # persist terminal state so resume can skip re-execution if self._on_completed: if self._exception: @@ -477,12 +483,15 @@ def run_managed( ) else: failed_message = None - self._on_completed(self._state, failed_message) + self._on_completed(terminal_state, failed_message) self._finished_at = pendulum.now() - # wake up waiting threads - if self._done_event: - with contextlib.suppress(ValueError): - self._done_event.release() + # Publish state only AFTER the marker is persisted and timestamp set. + # This is the moment the main thread is allowed to see the terminal state. + self._state = terminal_state + # wake up waiting threads for terminal (non-retry) states + if terminal_state != "retry" and self._done_event: + with contextlib.suppress(ValueError): + self._done_event.release() @abstractmethod def run(self) -> None: