diff --git a/dlt/common/configuration/specs/azure_credentials.py b/dlt/common/configuration/specs/azure_credentials.py index d31f902433..2bc280070b 100644 --- a/dlt/common/configuration/specs/azure_credentials.py +++ b/dlt/common/configuration/specs/azure_credentials.py @@ -187,6 +187,33 @@ def to_adlfs_credentials(self) -> Dict[str, Any]: return base_kwargs +@configspec +class OneLakeNotebookIdentityCredentials(CredentialsConfiguration): + """Azure credentials for OneLake filesystem staging under Fabric notebook identity. + + Returns adlfs kwargs with `account_name` and `account_host` only -- no + `credential` key. Fabric Python notebooks register a custom + `OnelakeFileSystem` as the `abfss://` handler, and its `__init__` falls + through to a built-in `make_credential()` helper when no credential is + supplied. + + Warning: only usable inside a Fabric notebook kernel. + """ + + azure_storage_account_name: str = "onelake" + """OneLake logical storage account name.""" + + azure_account_host: str = "onelake.blob.fabric.microsoft.com" + """OneLake blob DFS endpoint.""" + + def to_adlfs_credentials(self) -> Dict[str, Any]: + """Return adlfs kwargs with `account_name` and `account_host` only.""" + return { + "account_name": self.azure_storage_account_name, + "account_host": self.azure_account_host, + } + + AnyAzureCredentials = Union[ # Credentials without defaults come first because union types are attempted in order # and explicit config should supersede system defaults diff --git a/dlt/destinations/impl/fabric/configuration.py b/dlt/destinations/impl/fabric/configuration.py index e7cfcc2ccf..785c2d6fb3 100644 --- a/dlt/destinations/impl/fabric/configuration.py +++ b/dlt/destinations/impl/fabric/configuration.py @@ -5,6 +5,7 @@ from dlt.common.configuration.specs import AzureServicePrincipalCredentials from dlt.common.destination.client import DestinationClientDwhWithStagingConfiguration from dlt.common.exceptions import MissingDependencyException +from dlt.common.typing import TSecretStrValue from dlt import version _AZURE_STORAGE_EXTRA = f"{version.DLT_PKG_NAME}[az]" @@ -38,33 +39,56 @@ class FabricCredentials(AzureServicePrincipalCredentials): azure_storage_account_name: Optional[str] = None """Not used for Fabric Warehouse credentials (only staging credentials need this)""" + access_token: Optional[TSecretStrValue] = None + """Pre-fetched AAD bearer token for Fabric Warehouse.""" + + azure_credential: Optional[Any] = None + """Injectable `azure.core.credentials.TokenCredential` for Fabric Warehouse.""" + + def get_access_token(self) -> Optional[str]: + """Return an AAD bearer token for Fabric Warehouse, or `None`.""" + if self.access_token is not None: + return str(self.access_token) + if self.azure_credential is not None: + return self.azure_credential.get_token("https://database.windows.net/.default").token # type: ignore[no-any-return] + return None + def on_partial(self) -> None: - """Enable fallback to DefaultAzureCredential if explicit credentials not provided.""" + """Resolve partial credentials. + + When `access_token` or `azure_credential` is set, skip the + `DefaultAzureCredential` fallback -- the user has already provided auth. + """ + if self.access_token is not None or self.azure_credential is not None: + if self.host and self.database: + self.resolve() + return + try: from azure.identity import DefaultAzureCredential except ModuleNotFoundError: raise MissingDependencyException(self.__class__.__name__, [_AZURE_STORAGE_EXTRA]) - # If no explicit Service Principal credentials, use default credentials if not self.azure_client_id or not self.azure_client_secret or not self.azure_tenant_id: self._set_default_credentials(DefaultAzureCredential()) - # Resolve if we have warehouse connection details (not storage account name) if self.host and self.database: self.resolve() def get_odbc_dsn_dict(self) -> Dict[str, Any]: - """Build ODBC DSN dictionary with Fabric-specific settings.""" - params = { + """Build the ODBC DSN dictionary with Fabric-specific settings.""" + params: Dict[str, Any] = { "DRIVER": "{ODBC Driver 18 for SQL Server}", "SERVER": f"{self.host},{self.port}", "DATABASE": self.database, - "AUTHENTICATION": "ActiveDirectoryServicePrincipal", - "LongAsMax": "yes", # Required for UTF-8 collation support + "LongAsMax": "yes", "Encrypt": "yes", "TrustServerCertificate": "no", } - # Add Service Principal credentials if provided + if self.get_access_token() is not None: + return params + + params["AUTHENTICATION"] = "ActiveDirectoryServicePrincipal" if self.azure_client_id and self.azure_tenant_id and self.azure_client_secret: params["UID"] = f"{self.azure_client_id}@{self.azure_tenant_id}" params["PWD"] = str(self.azure_client_secret) diff --git a/dlt/destinations/impl/fabric/fabric.py b/dlt/destinations/impl/fabric/fabric.py index c0e4332ffe..e5f66470ad 100644 --- a/dlt/destinations/impl/fabric/fabric.py +++ b/dlt/destinations/impl/fabric/fabric.py @@ -101,6 +101,8 @@ def _ensure_fabric_token_initialized( Token initialization is cached per client_id to prevent excessive API calls during bulk loads. """ + if not credentials.azure_client_secret: + return cache_key = credentials.azure_client_id # Check if we've already initialized the token for this client diff --git a/dlt/destinations/impl/fabric/sql_client.py b/dlt/destinations/impl/fabric/sql_client.py index c7309eab95..174fe6d5db 100644 --- a/dlt/destinations/impl/fabric/sql_client.py +++ b/dlt/destinations/impl/fabric/sql_client.py @@ -1,18 +1,25 @@ """SQL client for Fabric Warehouse - extends Synapse SQL client""" -from typing import TYPE_CHECKING +import struct +from typing import TYPE_CHECKING, Any from dlt.common.destination import DestinationCapabilitiesContext from dlt.destinations.impl.synapse.sql_client import SynapseSqlClient +from dlt.destinations.impl.mssql.sql_client import handle_datetimeoffset if TYPE_CHECKING: from dlt.destinations.impl.fabric.configuration import FabricCredentials +SQL_COPT_SS_ACCESS_TOKEN = 1256 + + class FabricSqlClient(SynapseSqlClient): - """SQL client for Microsoft Fabric Warehouse + """SQL client for Microsoft Fabric Warehouse. - Inherits all behavior from Synapse since Fabric Warehouse is built on Synapse technology. + Overrides `open_connection` to support passing a pre-fetched AAD bearer + token via `attrs_before={SQL_COPT_SS_ACCESS_TOKEN: ...}` when the + credentials object is in notebook-token mode. """ def __init__( @@ -22,6 +29,24 @@ def __init__( credentials: "FabricCredentials", capabilities: DestinationCapabilitiesContext, ) -> None: - # FabricCredentials has all required attributes: database, to_odbc_dsn(), connect_timeout super().__init__(dataset_name, staging_dataset_name, credentials, capabilities) # type: ignore[arg-type] self.credentials: "FabricCredentials" = credentials # type: ignore[assignment] + + def open_connection(self) -> Any: + """Open a pyodbc connection, passing an AAD bearer token when available.""" + import pyodbc + + token_str = self.credentials.get_access_token() + if token_str is None: + return super().open_connection() + + raw = token_str.encode("utf-16-le") + token_struct = struct.pack(f" str: """A path within a bucket to tables in a dataset NOTE: dataset_name changes if with_staging_dataset is active """ - return self.pathlib.join(self.bucket_path, self.dataset_name, "") # type: ignore[no-any-return] + return self.pathlib.join(self.bucket_path, self.dataset_name) # type: ignore[no-any-return] @contextmanager def with_staging_dataset(self) -> Iterator["FilesystemClient"]: @@ -874,12 +874,13 @@ def prepare_load_table(self, table_name: str) -> PreparedTableSchema: def get_table_dir( self, table_name: str, remote: bool = False, schema_name: Optional[str] = None ) -> str: - """Returns a directory containing table files, ending with separator. - Note that many tables can share the same table dir + """Returns a directory containing table files. + + Note that many tables can share the same table dir. """ # dlt tables do not respect layout (for now) table_prefix = self.get_table_prefix(table_name, schema_name=schema_name) - table_dir: str = self.pathlib.dirname(table_prefix) + self.pathlib.sep + table_dir: str = self.pathlib.dirname(table_prefix) if remote: table_dir = self.make_remote_url(table_dir) return table_dir diff --git a/docs/website/docs/dlt-ecosystem/destinations/fabric.md b/docs/website/docs/dlt-ecosystem/destinations/fabric.md index 75ed4a1361..9bebea8867 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/fabric.md +++ b/docs/website/docs/dlt-ecosystem/destinations/fabric.md @@ -44,6 +44,75 @@ Fabric Warehouse requires Azure Active Directory Service Principal authenticatio - Select **SQL endpoint** - Copy the **SQL connection string** - it should be in the format: `.datawarehouse.fabric.microsoft.com` +### Notebook user identity (Microsoft Fabric notebooks) + +When running dlt from inside a Microsoft Fabric Python notebook, a Service Principal is typically not available -- the canonical auth source is `notebookutils.credentials.getToken(...)`. The Fabric destination supports two additional credential shapes for this scenario. + +#### Option A: raw `access_token` string + +Pass a pre-fetched AAD bearer token as the `access_token` field on `FabricCredentials`. Simplest pattern, suitable for pipelines that complete before the token expires (typically ~50 minutes): + +```py +import os + +import dlt + +os.environ["DESTINATION__FABRIC__CREDENTIALS__ACCESS_TOKEN"] = ( + notebookutils.credentials.getToken("pbi") +) +os.environ["DESTINATION__FABRIC__CREDENTIALS__HOST"] = ( + ".datawarehouse.fabric.microsoft.com" +) +os.environ["DESTINATION__FABRIC__CREDENTIALS__DATABASE"] = "" + +pipeline = dlt.pipeline( + pipeline_name="fabric_notebook_demo", + destination="fabric", + staging="filesystem", + dataset_name="demo", +) +``` + +The bearer token is consumed via `pyodbc.connect(..., attrs_before={1256: token_struct})` (`SQL_COPT_SS_ACCESS_TOKEN`) and the ODBC DSN omits `AUTHENTICATION`, `UID`, and `PWD`. + +**Token refresh:** the `access_token` string is static. For pipelines that may run longer than the token's validity window, use Option B. + +#### Option B: injectable `TokenCredential` (refreshing) + +Pass an `azure.core.credentials.TokenCredential` instance as the `azure_credential` field. The Fabric destination will call `get_token("https://database.windows.net/.default")` on each connection, delegating token caching and refresh to the credential implementation: + +```py +import time + +import dlt +from azure.core.credentials import AccessToken, TokenCredential + +from dlt.destinations.impl.fabric.configuration import FabricCredentials + + +class NotebookTokenCredential(TokenCredential): + def get_token(self, *scopes, **kwargs) -> AccessToken: + token = notebookutils.credentials.getToken("pbi") + return AccessToken(token, int(time.time()) + 3000) + + +creds = FabricCredentials() +creds.host = ".datawarehouse.fabric.microsoft.com" +creds.database = "" +creds.azure_credential = NotebookTokenCredential() + +pipeline = dlt.pipeline( + pipeline_name="fabric_notebook_demo_long", + destination=dlt.destinations.fabric(credentials=creds), + staging="filesystem", + dataset_name="demo", +) +``` + +#### Pairing with OneLake staging + +Under notebook user identity the filesystem staging side must also skip the Service Principal auth path. Use `OneLakeNotebookIdentityCredentials` on the filesystem staging config -- see the [filesystem destination OneLake section](filesystem.md#onelake-under-notebook-identity) for details. + ### Create a pipeline **1. Initialize a project with a pipeline that loads to Fabric by running:** @@ -205,7 +274,7 @@ driver="ODBC Driver 18 for SQL Server" While Fabric Warehouse is based on SQL Server, there are key differences: -1. **Authentication**: Fabric requires Service Principal; username/password auth is not supported +1. **Authentication**: Fabric supports Service Principal, raw `access_token`, and injectable `TokenCredential`; username/password auth is not supported 2. **Type System**: Uses `varchar` and `datetime2` instead of `nvarchar` and `datetimeoffset` 3. **Collation**: Optimized for UTF-8 collations with automatic `LongAsMax` configuration 4. **SQL Dialect**: Uses `fabric` SQLglot dialect for proper SQL generation diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 1e915aed9d..f01febe5d9 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -258,6 +258,38 @@ max_concurrency=3 ``` ::: +#### OneLake under notebook identity + +When using dlt from inside a Microsoft Fabric Python notebook with `staging="filesystem"` pointing at a OneLake bucket, the standard Azure credential classes are not applicable -- the notebook user has no Service Principal, and the Fabric-registered `OnelakeFileSystem` handler authenticates the current notebook user only when no explicit credential is supplied. + +Use `OneLakeNotebookIdentityCredentials` on the filesystem staging credentials: + +```toml +[destination.filesystem] +bucket_url = "abfss://@onelake.dfs.fabric.microsoft.com//Files/_dlt_stage" + +[destination.filesystem.credentials] +type = "OneLakeNotebookIdentityCredentials" +``` + +Or in Python: + +```py +from dlt.common.configuration.specs.azure_credentials import ( + OneLakeNotebookIdentityCredentials, +) + +filesystem_credentials = OneLakeNotebookIdentityCredentials() +``` + +This class returns adlfs kwargs with `account_name` and `account_host` only -- no `credential` key. Inside a Fabric notebook kernel, the registered `OnelakeFileSystem.__init__` falls through to its built-in `make_credential()` helper, producing a credential tied to the notebook user identity. + +:::caution +`OneLakeNotebookIdentityCredentials` only works inside a Fabric notebook kernel. Outside of Fabric, the `abfss://` protocol handler is plain adlfs `AzureBlobFileSystem`, which has no built-in credential fallback and will fail authentication at the first read. +::: + +Pair with `FabricCredentials.access_token` or `azure_credential` on the Fabric destination side -- see the [Fabric destination notebook identity section](fabric.md#notebook-user-identity-microsoft-fabric-notebooks). + ### Hugging Face The filesystem destination supports loading into [Hugging Face Datasets](https://huggingface.co/docs/datasets/index) using the `hf://` protocol. See the [Hugging Face destination](huggingface) page for setup and configuration details. diff --git a/tests/destinations/test_destination_name_and_config.py b/tests/destinations/test_destination_name_and_config.py index 66e19c8362..0b15d399fd 100644 --- a/tests/destinations/test_destination_name_and_config.py +++ b/tests/destinations/test_destination_name_and_config.py @@ -214,5 +214,4 @@ def test_destination_config_in_name(environment: DictStrStr) -> None: environment["DESTINATION__FILESYSTEM-PROD__BUCKET_URL"] = FilesystemConfiguration.make_file_url( get_test_storage_root() ) - pathlib = p._fs_client().pathlib # type: ignore[attr-defined] - assert p._fs_client().dataset_path.endswith(p.dataset_name + pathlib.sep) + assert p._fs_client().dataset_path.endswith(p.dataset_name) diff --git a/tests/load/fabric/test_fabric_configuration.py b/tests/load/fabric/test_fabric_configuration.py index 329d72c5ac..ccf77af9c6 100644 --- a/tests/load/fabric/test_fabric_configuration.py +++ b/tests/load/fabric/test_fabric_configuration.py @@ -194,3 +194,90 @@ def test_fabric_credentials_authentication_method() -> None: # Verify ActiveDirectoryServicePrincipal is set dsn_dict = creds.get_odbc_dsn_dict() assert dsn_dict["AUTHENTICATION"] == "ActiveDirectoryServicePrincipal" + + +def test_get_access_token_returns_raw_string_when_set() -> None: + """`get_access_token()` returns the raw `access_token` when it is set.""" + creds = FabricCredentials() + creds.host = "test.datawarehouse.fabric.microsoft.com" + creds.database = "testdb" + creds.access_token = "abc123" + + assert creds.get_access_token() == "abc123" + + +def test_get_access_token_returns_none_when_no_token_configured() -> None: + """`get_access_token()` returns None when neither access_token nor azure_credential is set.""" + creds = FabricCredentials() + creds.host = "test.datawarehouse.fabric.microsoft.com" + creds.database = "testdb" + + assert creds.get_access_token() is None + + +def test_get_access_token_calls_injected_credential_when_set() -> None: + """`get_access_token()` delegates to an injected TokenCredential when + `access_token` is not set.""" + from unittest.mock import MagicMock + from azure.core.credentials import AccessToken + + fake_credential = MagicMock() + fake_credential.get_token.return_value = AccessToken("injected-token", 1234567890) + + creds = FabricCredentials() + creds.host = "test.datawarehouse.fabric.microsoft.com" + creds.database = "testdb" + creds.azure_credential = fake_credential + + assert creds.get_access_token() == "injected-token" + fake_credential.get_token.assert_called_once_with("https://database.windows.net/.default") + + +def test_get_odbc_dsn_dict_omits_auth_fields_in_token_mode() -> None: + """When `access_token` is set, the DSN dict must not include + `AUTHENTICATION`/`UID`/`PWD`.""" + creds = FabricCredentials() + creds.host = "test.datawarehouse.fabric.microsoft.com" + creds.database = "testdb" + creds.access_token = "abc123" + + dsn_dict = creds.get_odbc_dsn_dict() + + assert "AUTHENTICATION" not in dsn_dict + assert "UID" not in dsn_dict + assert "PWD" not in dsn_dict + assert dsn_dict["DRIVER"] == "{ODBC Driver 18 for SQL Server}" + assert dsn_dict["SERVER"] == "test.datawarehouse.fabric.microsoft.com,1433" + assert dsn_dict["DATABASE"] == "testdb" + assert dsn_dict["LongAsMax"] == "yes" + + +def test_on_partial_skips_default_azure_credential_in_token_mode() -> None: + """When `access_token` is set, `on_partial` must not attempt to + import or instantiate `DefaultAzureCredential`.""" + from unittest.mock import patch + + creds = FabricCredentials() + creds.host = "test.datawarehouse.fabric.microsoft.com" + creds.database = "testdb" + creds.access_token = "abc123" + + with patch( + "dlt.destinations.impl.fabric.configuration.FabricCredentials._set_default_credentials", + ) as mock_set_default: + creds.on_partial() + + mock_set_default.assert_not_called() + + +def test_on_partial_resolves_when_access_token_and_host_set() -> None: + """When `access_token`, `host`, and `database` are set, `on_partial` must + call `self.resolve()` so the credentials are not left in a partial state.""" + creds = FabricCredentials() + creds.host = "test.datawarehouse.fabric.microsoft.com" + creds.database = "testdb" + creds.access_token = "abc123" + + creds.on_partial() + + assert creds.is_resolved() diff --git a/tests/load/fabric/test_fabric_sql_client.py b/tests/load/fabric/test_fabric_sql_client.py new file mode 100644 index 0000000000..fb1da91d51 --- /dev/null +++ b/tests/load/fabric/test_fabric_sql_client.py @@ -0,0 +1,138 @@ +"""Tests for `FabricSqlClient.open_connection` under mocked pyodbc.""" +from __future__ import annotations + +import struct +import sys +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + + +pytestmark = pytest.mark.essential + + +@pytest.fixture(autouse=True) +def _mock_pyodbc(monkeypatch: pytest.MonkeyPatch) -> None: # type: ignore[misc] + """Install a fake pyodbc module in sys.modules.""" + fake_pyodbc = MagicMock(name="pyodbc_module") + monkeypatch.setitem(sys.modules, "pyodbc", fake_pyodbc) + yield + + +def _fake_sql_client() -> SimpleNamespace: + """Stand-in FabricSqlClient with attributes open_connection touches.""" + creds = SimpleNamespace( + host="test.datawarehouse.fabric.microsoft.com", + port=1433, + database="testdb", + connect_timeout=15, + access_token=None, + azure_credential=None, + azure_client_id=None, + azure_tenant_id=None, + azure_client_secret=None, + ) + creds.get_access_token = lambda: ( + str(creds.access_token) if creds.access_token is not None else None + ) + creds.to_odbc_dsn = MagicMock( + return_value=( + "DRIVER={ODBC Driver 18 for SQL Server};" + "SERVER=test.datawarehouse.fabric.microsoft.com,1433;" + "DATABASE=testdb;" + "LongAsMax=yes;Encrypt=yes;TrustServerCertificate=no;" + ) + ) + return SimpleNamespace(credentials=creds, _conn=None) + + +def test_open_connection_passes_token_via_attrs_before_1256() -> None: + from dlt.destinations.impl.fabric.sql_client import FabricSqlClient + + client = _fake_sql_client() + client.credentials.access_token = "FAKE_TOKEN" + + FabricSqlClient.open_connection(client) # type: ignore[arg-type] + + pyodbc = sys.modules["pyodbc"] + assert pyodbc.connect.called + attrs_before = pyodbc.connect.call_args.kwargs["attrs_before"] + assert 1256 in attrs_before + raw = "FAKE_TOKEN".encode("utf-16-le") + expected = struct.pack(f" None: + from dlt.destinations.impl.fabric.sql_client import FabricSqlClient + + client = _fake_sql_client() + client.credentials.access_token = "FAKE_TOKEN" + client.credentials.connect_timeout = 42 + + FabricSqlClient.open_connection(client) # type: ignore[arg-type] + + pyodbc = sys.modules["pyodbc"] + assert pyodbc.connect.call_args.kwargs["timeout"] == 42 + + +def test_open_connection_uses_sp_path_when_no_access_token() -> None: + from unittest.mock import patch + + from dlt.destinations.impl.fabric.sql_client import FabricSqlClient + from dlt.destinations.impl.mssql.sql_client import PyOdbcMsSqlClient + + # super() requires a real FabricSqlClient instance, so build one + # with __init__ bypassed + client = FabricSqlClient.__new__(FabricSqlClient) + base = _fake_sql_client() + client.credentials = base.credentials + client._conn = None + client.credentials.access_token = None + + sentinel = object() + with patch.object(PyOdbcMsSqlClient, "open_connection", return_value=sentinel) as mock_super: + result = FabricSqlClient.open_connection(client) + + mock_super.assert_called_once() + assert result is sentinel + + +def test_open_connection_sets_autocommit_true_in_token_mode() -> None: + from dlt.destinations.impl.fabric.sql_client import FabricSqlClient + + client = _fake_sql_client() + client.credentials.access_token = "FAKE_TOKEN" + + FabricSqlClient.open_connection(client) # type: ignore[arg-type] + + pyodbc = sys.modules["pyodbc"] + returned_conn = pyodbc.connect.return_value + assert returned_conn.autocommit is True + + +def test_open_connection_installs_datetimeoffset_converter_in_token_mode() -> None: + from dlt.destinations.impl.fabric.sql_client import FabricSqlClient + + client = _fake_sql_client() + client.credentials.access_token = "FAKE_TOKEN" + + FabricSqlClient.open_connection(client) # type: ignore[arg-type] + + pyodbc = sys.modules["pyodbc"] + returned_conn = pyodbc.connect.return_value + returned_conn.add_output_converter.assert_called() + converter_args = returned_conn.add_output_converter.call_args.args + assert converter_args[0] == -155 + + +def test_open_connection_caches_conn_on_self() -> None: + from dlt.destinations.impl.fabric.sql_client import FabricSqlClient + + client = _fake_sql_client() + client.credentials.access_token = "FAKE_TOKEN" + + returned = FabricSqlClient.open_connection(client) # type: ignore[arg-type] + + assert client._conn is returned diff --git a/tests/load/fabric/test_fabric_warmup_gate.py b/tests/load/fabric/test_fabric_warmup_gate.py new file mode 100644 index 0000000000..60e98c7b02 --- /dev/null +++ b/tests/load/fabric/test_fabric_warmup_gate.py @@ -0,0 +1,94 @@ +"""Tests for the defensive short-circuit on +`FabricCopyFileLoadJob._ensure_fabric_token_initialized`.""" +from __future__ import annotations + +import sys +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + + +pytestmark = pytest.mark.essential + + +@pytest.fixture(autouse=True) +def _mock_pyodbc(monkeypatch: pytest.MonkeyPatch) -> None: + """Install a fake pyodbc module so the mssql import chain succeeds.""" + fake_pyodbc = MagicMock(name="pyodbc_module") + monkeypatch.setitem(sys.modules, "pyodbc", fake_pyodbc) + + +@pytest.fixture(autouse=True) +def _mock_azure_identity(monkeypatch: pytest.MonkeyPatch) -> MagicMock: + """Install a fake azure.identity.ClientSecretCredential.""" + fake_credential_cls = MagicMock(name="ClientSecretCredential") + fake_module = MagicMock(name="azure.identity") + fake_module.ClientSecretCredential = fake_credential_cls + monkeypatch.setitem(sys.modules, "azure.identity", fake_module) + return fake_credential_cls + + +def _fake_load_job() -> SimpleNamespace: + return SimpleNamespace(_token_initialized_cache={}) + + +def test_warmup_short_circuits_when_staging_secret_is_none( + _mock_azure_identity: MagicMock, +) -> None: + from dlt.destinations.impl.fabric.fabric import FabricCopyFileLoadJob + + job = _fake_load_job() + staging_credentials = SimpleNamespace( + azure_client_id="client-id", + azure_tenant_id="tenant-id", + azure_client_secret=None, + ) + + FabricCopyFileLoadJob._ensure_fabric_token_initialized(job, staging_credentials) # type: ignore[arg-type] + + _mock_azure_identity.assert_not_called() + + +def test_warmup_short_circuits_when_staging_secret_is_empty_string( + _mock_azure_identity: MagicMock, +) -> None: + from dlt.destinations.impl.fabric.fabric import FabricCopyFileLoadJob + + job = _fake_load_job() + staging_credentials = SimpleNamespace( + azure_client_id="client-id", + azure_tenant_id="tenant-id", + azure_client_secret="", + ) + + FabricCopyFileLoadJob._ensure_fabric_token_initialized(job, staging_credentials) # type: ignore[arg-type] + + _mock_azure_identity.assert_not_called() + + +def test_warmup_still_runs_when_staging_secret_is_real_value( + _mock_azure_identity: MagicMock, monkeypatch: pytest.MonkeyPatch +) -> None: + """The short-circuit must not break the SP happy path.""" + fake_requests = MagicMock(name="requests_module") + fake_requests.get.return_value = SimpleNamespace(status_code=200, text="ok") + monkeypatch.setitem(sys.modules, "requests", fake_requests) + + from dlt.destinations.impl.fabric.fabric import FabricCopyFileLoadJob + + job = _fake_load_job() + staging_credentials = SimpleNamespace( + azure_client_id="client-id", + azure_tenant_id="tenant-id", + azure_client_secret="real-secret", + ) + + FabricCopyFileLoadJob._ensure_fabric_token_initialized(job, staging_credentials) # type: ignore[arg-type] + + _mock_azure_identity.assert_called_once_with( + tenant_id="tenant-id", + client_id="client-id", + client_secret="real-secret", + ) + assert job._token_initialized_cache["client-id"] is True diff --git a/tests/load/filesystem/test_azure_credentials.py b/tests/load/filesystem/test_azure_credentials.py index e9138c5ec6..4d1488c72c 100644 --- a/tests/load/filesystem/test_azure_credentials.py +++ b/tests/load/filesystem/test_azure_credentials.py @@ -344,3 +344,37 @@ def test_azure_service_principal_pyiceberg_export_import() -> None: # test connection using imported credentials assert can_connect_pyiceberg_fileio_config(ABFS_BUCKET, pyiceberg_config) + + +def test_onelake_notebook_identity_credentials_defaults() -> None: + from dlt.common.configuration.specs.azure_credentials import ( + OneLakeNotebookIdentityCredentials, + ) + + creds = OneLakeNotebookIdentityCredentials() + assert creds.azure_storage_account_name == "onelake" + assert creds.azure_account_host == "onelake.blob.fabric.microsoft.com" + + +def test_onelake_notebook_identity_to_adlfs_credentials_returns_account_only() -> None: + from dlt.common.configuration.specs.azure_credentials import ( + OneLakeNotebookIdentityCredentials, + ) + + creds = OneLakeNotebookIdentityCredentials() + result = creds.to_adlfs_credentials() + + assert set(result.keys()) == {"account_name", "account_host"} + assert result["account_name"] == "onelake" + assert result["account_host"] == "onelake.blob.fabric.microsoft.com" + assert "credential" not in result + + +def test_onelake_notebook_identity_is_not_service_principal_subclass() -> None: + from dlt.common.configuration.specs.azure_credentials import ( + AzureServicePrincipalCredentialsWithoutDefaults, + OneLakeNotebookIdentityCredentials, + ) + + creds = OneLakeNotebookIdentityCredentials() + assert not isinstance(creds, AzureServicePrincipalCredentialsWithoutDefaults) diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 9ff9a008be..21803afe7b 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -134,18 +134,18 @@ def test_trailing_separators(layout: str, with_gdrive_buckets_env: str) -> None: os.environ["DESTINATION__FILESYSTEM__LAYOUT"] = layout load = setup_loader("_data") client: FilesystemClient = load.get_destination_client(Schema("empty")) # type: ignore[assignment] - # assert separators - assert client.dataset_path.endswith("_data/") - assert client.get_table_dir("_dlt_versions").endswith("_dlt_versions/") - assert client.get_table_dir("_dlt_versions", remote=True).endswith("_dlt_versions/") + # assert paths no longer carry a trailing separator after the strip-trailing-slash fix + assert client.dataset_path.endswith("_data") + assert client.get_table_dir("_dlt_versions").endswith("_dlt_versions") + assert client.get_table_dir("_dlt_versions", remote=True).endswith("_dlt_versions") is_folder = layout.startswith("{table_name}/") if is_folder: - assert client.get_table_dir("letters").endswith("_data/letters/") - assert client.get_table_dir("letters", remote=True).endswith("_data/letters/") + assert client.get_table_dir("letters").endswith("_data/letters") + assert client.get_table_dir("letters", remote=True).endswith("_data/letters") else: # strip prefix - assert client.get_table_dir("letters").endswith("_data/") - assert client.get_table_dir("letters", remote=True).endswith("_data/") + assert client.get_table_dir("letters").endswith("_data") + assert client.get_table_dir("letters", remote=True).endswith("_data") if is_folder: assert client.get_table_prefix("letters").endswith("_data/letters/") else: @@ -555,3 +555,35 @@ def assert_hf_endpoint_set(*args, **kwargs): with patch("huggingface_hub.metadata_update", side_effect=assert_hf_endpoint_set): client.update_dataset_card_metadata(load_id="test") + + +def test_dataset_path_has_no_trailing_separator() -> None: + """`dataset_path` must not end with `/`. + + OneLake (Microsoft Fabric) responds with `403 ClientAuthenticationError` + when `BlobClient.exists` targets a blob name ending in `/`, instead of + the `404 ResourceNotFoundError` that other Azure backends return. That + makes `FilesystemClient.initialize_storage` blow up on its first + `fs.isdir(self.dataset_path)` call before any data is written. Non-OneLake + backends observe the same latent defect as a silent `False`. + """ + client = _client_factory(filesystem(bucket_url="file:///tmp/dlt-test-bucket")) + assert not client.dataset_path.endswith( + "/" + ), f"dataset_path must not end with '/', got {client.dataset_path!r}" + + +def test_get_table_dir_has_no_trailing_separator() -> None: + """`get_table_dir` must not end with `/` or `\\`. + + `FilesystemClient.truncate_tables` iterates `get_table_dirs(...)` and + calls `self.fs_client.exists(table_dir)` for each one. On OneLake that + produces a `403 ClientAuthenticationError` on every truncated table + once the `dataset_path` trailing-slash bug (see previous test) is + already fixed. Same root cause, one level deeper. + """ + client = _client_factory(filesystem(bucket_url="file:///tmp/dlt-test-bucket")) + table_dir = client.get_table_dir("some_table") + assert not table_dir.endswith( + ("/", "\\") + ), f"get_table_dir must not end with a separator, got {table_dir!r}"