Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dlt/_workspace/cli/_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def plug_cli_profile() -> Type[plugins.SupportsCliCommand]:

return ProfileCommand
else:
return None
from dlt._workspace.cli.commands import _ProfilePlaceholder

return _ProfilePlaceholder


@plugins.hookimpl(specname="plug_cli")
Expand All @@ -99,4 +101,6 @@ def plug_cli_workspace() -> Type[plugins.SupportsCliCommand]:

return WorkspaceCommand
else:
return None
from dlt._workspace.cli.commands import _WorkspacePlaceholder

return _WorkspacePlaceholder
41 changes: 41 additions & 0 deletions dlt/_workspace/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,44 @@ def execute(self, args: argparse.Namespace) -> None:
pin_profile(workspace_context, args.profile_name)
else:
self.parser.print_usage()


class _ProfilePlaceholder(SupportsCliCommand):
"""Registered when workspace context is not active so `dlt profile` is always discoverable."""

command = "profile"
help_string = "Manage Workspace built-in profiles (requires .dlt/.workspace)"

def configure_parser(self, parser: argparse.ArgumentParser) -> None:
self.parser = parser
# Accept but ignore any subcommands/args so the command never errors on parse
parser.add_argument("remainder", nargs="*", help=argparse.SUPPRESS)

def execute(self, args: argparse.Namespace) -> None:
fmt.warning(
"Profiles require an initialized workspace.\n"
"Create the workspace marker to enable profile support:\n\n"
" mkdir -p .dlt && touch .dlt/.workspace\n\n"
"Then run 'dlt profile' again."
)
raise CliCommandException(error_code=-1)


class _WorkspacePlaceholder(SupportsCliCommand):
"""Registered when workspace context is not active so `dlt workspace` is always discoverable."""

command = "workspace"
help_string = "Manage current Workspace (requires .dlt/.workspace)"

def configure_parser(self, parser: argparse.ArgumentParser) -> None:
self.parser = parser
parser.add_argument("remainder", nargs="*", help=argparse.SUPPRESS)

def execute(self, args: argparse.Namespace) -> None:
fmt.warning(
"Workspace commands require an initialized workspace.\n"
"Create the workspace marker to enable workspace support:\n\n"
" mkdir -p .dlt && touch .dlt/.workspace\n\n"
"Then run 'dlt workspace' again."
)
raise CliCommandException(error_code=-1)
29 changes: 19 additions & 10 deletions dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,25 +446,31 @@ def run_managed(
self._done_event = done_event

# filepath is now moved to running
# Use a local variable to track the terminal state. We must NOT publish
# to self._state until _on_completed() has finished persisting the
# pending-transition marker to disk. Otherwise the main thread can see
# "completed" via job.state(), finalize the package (rename the directory),
# and race against the worker still writing inside it. See #3849.
terminal_state: TLoadJobState = "running"
try:
self._state = "running"
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
terminal_state = "completed"
except (TerminalException, AssertionError) as e:
self._state = "failed"
terminal_state = "failed"
self._exception = e
logger.exception(f"Terminal exception in job {self.job_id()} in file {self._file_path}")
except (DestinationTransientException, Exception) as e:
self._state = "retry"
terminal_state = "retry"
self._exception = e
logger.exception(
f"Transient exception in job {self.job_id()} in file {self._file_path}"
)
finally:
# sanity check
assert self._state in ("completed", "retry", "failed")
if self._state != "retry":
assert terminal_state in ("completed", "retry", "failed")
if terminal_state != "retry":
# persist terminal state so resume can skip re-execution
if self._on_completed:
if self._exception:
Expand All @@ -477,12 +483,15 @@ def run_managed(
)
else:
failed_message = None
self._on_completed(self._state, failed_message)
self._on_completed(terminal_state, failed_message)
self._finished_at = pendulum.now()
# wake up waiting threads
if self._done_event:
with contextlib.suppress(ValueError):
self._done_event.release()
# Publish state only AFTER the marker is persisted and timestamp set.
# This is the moment the main thread is allowed to see the terminal state.
self._state = terminal_state
# wake up waiting threads for terminal (non-retry) states
if terminal_state != "retry" and self._done_event:
with contextlib.suppress(ValueError):
self._done_event.release()

@abstractmethod
def run(self) -> None:
Expand Down