Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
36420d0
fix: strip trailing slash from FilesystemClient.dataset_path
mattiasthalen Apr 15, 2026
eebb842
style: apply black formatting to new regression test
mattiasthalen Apr 15, 2026
f3fd2de
fix: strip trailing slash from FilesystemClient.get_table_dir
mattiasthalen Apr 15, 2026
93a2794
test: align trailing-slash expectations with corrected FilesystemClie…
mattiasthalen Apr 15, 2026
b5aedc0
test: align test_destination_config_in_name with stripped dataset_path
mattiasthalen Apr 15, 2026
26ee8ff
feat(fabric): add access_token field to FabricCredentials
mattiasthalen Apr 16, 2026
9d572f1
feat(fabric): add azure_credential field and extend get_access_token
mattiasthalen Apr 16, 2026
0d0f864
feat(fabric): omit ODBC auth fields in DSN when token-auth mode is ac…
mattiasthalen Apr 16, 2026
75b0ee0
feat(fabric): skip DefaultAzureCredential fallback in token-auth mode
mattiasthalen Apr 16, 2026
225fef3
feat(fabric): override open_connection for notebook-token auth
mattiasthalen Apr 16, 2026
57f1061
feat(azure): add OneLakeNotebookIdentityCredentials for Fabric staging
mattiasthalen Apr 16, 2026
5d89846
feat(fabric): short-circuit _ensure_fabric_token_initialized when SP …
mattiasthalen Apr 16, 2026
5163ca3
docs(fabric): document notebook user identity auth patterns
mattiasthalen Apr 16, 2026
14f223c
docs(filesystem): document OneLakeNotebookIdentityCredentials for Fab…
mattiasthalen Apr 16, 2026
fe97d2c
fix(fabric): resolve mypy errors on test mocks and credential return …
mattiasthalen Apr 16, 2026
65f383c
fix(fabric): resolve credentials in on_partial when token-auth mode i…
mattiasthalen Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,33 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
return base_kwargs


@configspec
class OneLakeNotebookIdentityCredentials(CredentialsConfiguration):
"""Azure credentials for OneLake filesystem staging under Fabric notebook identity.

Returns adlfs kwargs with `account_name` and `account_host` only -- no
`credential` key. Fabric Python notebooks register a custom
`OnelakeFileSystem` as the `abfss://` handler, and its `__init__` falls
through to a built-in `make_credential()` helper when no credential is
supplied.

Warning: only usable inside a Fabric notebook kernel.
"""

azure_storage_account_name: str = "onelake"
"""OneLake logical storage account name."""

azure_account_host: str = "onelake.blob.fabric.microsoft.com"
"""OneLake blob DFS endpoint."""

def to_adlfs_credentials(self) -> Dict[str, Any]:
"""Return adlfs kwargs with `account_name` and `account_host` only."""
return {
"account_name": self.azure_storage_account_name,
"account_host": self.azure_account_host,
}


AnyAzureCredentials = Union[
# Credentials without defaults come first because union types are attempted in order
# and explicit config should supersede system defaults
Expand Down
40 changes: 32 additions & 8 deletions dlt/destinations/impl/fabric/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dlt.common.configuration.specs import AzureServicePrincipalCredentials
from dlt.common.destination.client import DestinationClientDwhWithStagingConfiguration
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TSecretStrValue
from dlt import version

_AZURE_STORAGE_EXTRA = f"{version.DLT_PKG_NAME}[az]"
Expand Down Expand Up @@ -38,33 +39,56 @@ class FabricCredentials(AzureServicePrincipalCredentials):
azure_storage_account_name: Optional[str] = None
"""Not used for Fabric Warehouse credentials (only staging credentials need this)"""

access_token: Optional[TSecretStrValue] = None
"""Pre-fetched AAD bearer token for Fabric Warehouse."""

azure_credential: Optional[Any] = None
"""Injectable `azure.core.credentials.TokenCredential` for Fabric Warehouse."""

def get_access_token(self) -> Optional[str]:
"""Return an AAD bearer token for Fabric Warehouse, or `None`."""
if self.access_token is not None:
return str(self.access_token)
if self.azure_credential is not None:
return self.azure_credential.get_token("https://database.windows.net/.default").token # type: ignore[no-any-return]
return None

def on_partial(self) -> None:
"""Enable fallback to DefaultAzureCredential if explicit credentials not provided."""
"""Resolve partial credentials.

When `access_token` or `azure_credential` is set, skip the
`DefaultAzureCredential` fallback -- the user has already provided auth.
"""
if self.access_token is not None or self.azure_credential is not None:
if self.host and self.database:
self.resolve()
return

try:
from azure.identity import DefaultAzureCredential
except ModuleNotFoundError:
raise MissingDependencyException(self.__class__.__name__, [_AZURE_STORAGE_EXTRA])

# If no explicit Service Principal credentials, use default credentials
if not self.azure_client_id or not self.azure_client_secret or not self.azure_tenant_id:
self._set_default_credentials(DefaultAzureCredential())
# Resolve if we have warehouse connection details (not storage account name)
if self.host and self.database:
self.resolve()

def get_odbc_dsn_dict(self) -> Dict[str, Any]:
"""Build ODBC DSN dictionary with Fabric-specific settings."""
params = {
"""Build the ODBC DSN dictionary with Fabric-specific settings."""
params: Dict[str, Any] = {
"DRIVER": "{ODBC Driver 18 for SQL Server}",
"SERVER": f"{self.host},{self.port}",
"DATABASE": self.database,
"AUTHENTICATION": "ActiveDirectoryServicePrincipal",
"LongAsMax": "yes", # Required for UTF-8 collation support
"LongAsMax": "yes",
"Encrypt": "yes",
"TrustServerCertificate": "no",
}

# Add Service Principal credentials if provided
if self.get_access_token() is not None:
return params

params["AUTHENTICATION"] = "ActiveDirectoryServicePrincipal"
if self.azure_client_id and self.azure_tenant_id and self.azure_client_secret:
params["UID"] = f"{self.azure_client_id}@{self.azure_tenant_id}"
params["PWD"] = str(self.azure_client_secret)
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/fabric/fabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def _ensure_fabric_token_initialized(

Token initialization is cached per client_id to prevent excessive API calls during bulk loads.
"""
if not credentials.azure_client_secret:
return
cache_key = credentials.azure_client_id

# Check if we've already initialized the token for this client
Expand Down
33 changes: 29 additions & 4 deletions dlt/destinations/impl/fabric/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
"""SQL client for Fabric Warehouse - extends Synapse SQL client"""

from typing import TYPE_CHECKING
import struct
from typing import TYPE_CHECKING, Any

from dlt.common.destination import DestinationCapabilitiesContext
from dlt.destinations.impl.synapse.sql_client import SynapseSqlClient
from dlt.destinations.impl.mssql.sql_client import handle_datetimeoffset

if TYPE_CHECKING:
from dlt.destinations.impl.fabric.configuration import FabricCredentials


SQL_COPT_SS_ACCESS_TOKEN = 1256


class FabricSqlClient(SynapseSqlClient):
"""SQL client for Microsoft Fabric Warehouse
"""SQL client for Microsoft Fabric Warehouse.

Inherits all behavior from Synapse since Fabric Warehouse is built on Synapse technology.
Overrides `open_connection` to support passing a pre-fetched AAD bearer
token via `attrs_before={SQL_COPT_SS_ACCESS_TOKEN: ...}` when the
credentials object is in notebook-token mode.
"""

def __init__(
Expand All @@ -22,6 +29,24 @@ def __init__(
credentials: "FabricCredentials",
capabilities: DestinationCapabilitiesContext,
) -> None:
# FabricCredentials has all required attributes: database, to_odbc_dsn(), connect_timeout
super().__init__(dataset_name, staging_dataset_name, credentials, capabilities) # type: ignore[arg-type]
self.credentials: "FabricCredentials" = credentials # type: ignore[assignment]

def open_connection(self) -> Any:
"""Open a pyodbc connection, passing an AAD bearer token when available."""
import pyodbc

token_str = self.credentials.get_access_token()
if token_str is None:
return super().open_connection()

raw = token_str.encode("utf-16-le")
token_struct = struct.pack(f"<I{len(raw)}s", len(raw), raw)
self._conn = pyodbc.connect(
self.credentials.to_odbc_dsn(),
timeout=self.credentials.connect_timeout,
attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct},
)
self._conn.add_output_converter(-155, handle_datetimeoffset)
self._conn.autocommit = True
return self._conn
9 changes: 5 additions & 4 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def dataset_path(self) -> str:
"""A path within a bucket to tables in a dataset
NOTE: dataset_name changes if with_staging_dataset is active
"""
return self.pathlib.join(self.bucket_path, self.dataset_name, "") # type: ignore[no-any-return]
return self.pathlib.join(self.bucket_path, self.dataset_name) # type: ignore[no-any-return]

@contextmanager
def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
Expand Down Expand Up @@ -874,12 +874,13 @@ def prepare_load_table(self, table_name: str) -> PreparedTableSchema:
def get_table_dir(
self, table_name: str, remote: bool = False, schema_name: Optional[str] = None
) -> str:
"""Returns a directory containing table files, ending with separator.
Note that many tables can share the same table dir
"""Returns a directory containing table files.

Note that many tables can share the same table dir.
"""
# dlt tables do not respect layout (for now)
table_prefix = self.get_table_prefix(table_name, schema_name=schema_name)
table_dir: str = self.pathlib.dirname(table_prefix) + self.pathlib.sep
table_dir: str = self.pathlib.dirname(table_prefix)
if remote:
table_dir = self.make_remote_url(table_dir)
return table_dir
Expand Down
71 changes: 70 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/fabric.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,75 @@ Fabric Warehouse requires Azure Active Directory Service Principal authenticatio
- Select **SQL endpoint**
- Copy the **SQL connection string** - it should be in the format: `<guid>.datawarehouse.fabric.microsoft.com`

### Notebook user identity (Microsoft Fabric notebooks)

When running dlt from inside a Microsoft Fabric Python notebook, a Service Principal is typically not available -- the canonical auth source is `notebookutils.credentials.getToken(...)`. The Fabric destination supports two additional credential shapes for this scenario.

#### Option A: raw `access_token` string

Pass a pre-fetched AAD bearer token as the `access_token` field on `FabricCredentials`. Simplest pattern, suitable for pipelines that complete before the token expires (typically ~50 minutes):

```py
import os

import dlt

os.environ["DESTINATION__FABRIC__CREDENTIALS__ACCESS_TOKEN"] = (
notebookutils.credentials.getToken("pbi")
)
os.environ["DESTINATION__FABRIC__CREDENTIALS__HOST"] = (
"<workspace-guid>.datawarehouse.fabric.microsoft.com"
)
os.environ["DESTINATION__FABRIC__CREDENTIALS__DATABASE"] = "<warehouse-name>"

pipeline = dlt.pipeline(
pipeline_name="fabric_notebook_demo",
destination="fabric",
staging="filesystem",
dataset_name="demo",
)
```

The bearer token is consumed via `pyodbc.connect(..., attrs_before={1256: token_struct})` (`SQL_COPT_SS_ACCESS_TOKEN`) and the ODBC DSN omits `AUTHENTICATION`, `UID`, and `PWD`.

**Token refresh:** the `access_token` string is static. For pipelines that may run longer than the token's validity window, use Option B.

#### Option B: injectable `TokenCredential` (refreshing)

Pass an `azure.core.credentials.TokenCredential` instance as the `azure_credential` field. The Fabric destination will call `get_token("https://database.windows.net/.default")` on each connection, delegating token caching and refresh to the credential implementation:

```py
import time

import dlt
from azure.core.credentials import AccessToken, TokenCredential

from dlt.destinations.impl.fabric.configuration import FabricCredentials


class NotebookTokenCredential(TokenCredential):
def get_token(self, *scopes, **kwargs) -> AccessToken:
token = notebookutils.credentials.getToken("pbi")
return AccessToken(token, int(time.time()) + 3000)


creds = FabricCredentials()
creds.host = "<workspace-guid>.datawarehouse.fabric.microsoft.com"
creds.database = "<warehouse-name>"
creds.azure_credential = NotebookTokenCredential()

pipeline = dlt.pipeline(
pipeline_name="fabric_notebook_demo_long",
destination=dlt.destinations.fabric(credentials=creds),
staging="filesystem",
dataset_name="demo",
)
```

#### Pairing with OneLake staging

Under notebook user identity the filesystem staging side must also skip the Service Principal auth path. Use `OneLakeNotebookIdentityCredentials` on the filesystem staging config -- see the [filesystem destination OneLake section](filesystem.md#onelake-under-notebook-identity) for details.

### Create a pipeline

**1. Initialize a project with a pipeline that loads to Fabric by running:**
Expand Down Expand Up @@ -205,7 +274,7 @@ driver="ODBC Driver 18 for SQL Server"

While Fabric Warehouse is based on SQL Server, there are key differences:

1. **Authentication**: Fabric requires Service Principal; username/password auth is not supported
1. **Authentication**: Fabric supports Service Principal, raw `access_token`, and injectable `TokenCredential`; username/password auth is not supported
2. **Type System**: Uses `varchar` and `datetime2` instead of `nvarchar` and `datetimeoffset`
3. **Collation**: Optimized for UTF-8 collations with automatic `LongAsMax` configuration
4. **SQL Dialect**: Uses `fabric` SQLglot dialect for proper SQL generation
Expand Down
32 changes: 32 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,38 @@ max_concurrency=3
```
:::

#### OneLake under notebook identity

When using dlt from inside a Microsoft Fabric Python notebook with `staging="filesystem"` pointing at a OneLake bucket, the standard Azure credential classes are not applicable -- the notebook user has no Service Principal, and the Fabric-registered `OnelakeFileSystem` handler authenticates the current notebook user only when no explicit credential is supplied.

Use `OneLakeNotebookIdentityCredentials` on the filesystem staging credentials:

```toml
[destination.filesystem]
bucket_url = "abfss://<workspace-guid>@onelake.dfs.fabric.microsoft.com/<lakehouse-guid>/Files/_dlt_stage"

[destination.filesystem.credentials]
type = "OneLakeNotebookIdentityCredentials"
```

Or in Python:

```py
from dlt.common.configuration.specs.azure_credentials import (
OneLakeNotebookIdentityCredentials,
)

filesystem_credentials = OneLakeNotebookIdentityCredentials()
```

This class returns adlfs kwargs with `account_name` and `account_host` only -- no `credential` key. Inside a Fabric notebook kernel, the registered `OnelakeFileSystem.__init__` falls through to its built-in `make_credential()` helper, producing a credential tied to the notebook user identity.

:::caution
`OneLakeNotebookIdentityCredentials` only works inside a Fabric notebook kernel. Outside of Fabric, the `abfss://` protocol handler is plain adlfs `AzureBlobFileSystem`, which has no built-in credential fallback and will fail authentication at the first read.
:::

Pair with `FabricCredentials.access_token` or `azure_credential` on the Fabric destination side -- see the [Fabric destination notebook identity section](fabric.md#notebook-user-identity-microsoft-fabric-notebooks).

### Hugging Face

The filesystem destination supports loading into [Hugging Face Datasets](https://huggingface.co/docs/datasets/index) using the `hf://` protocol. See the [Hugging Face destination](huggingface) page for setup and configuration details.
Expand Down
3 changes: 1 addition & 2 deletions tests/destinations/test_destination_name_and_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,4 @@ def test_destination_config_in_name(environment: DictStrStr) -> None:
environment["DESTINATION__FILESYSTEM-PROD__BUCKET_URL"] = FilesystemConfiguration.make_file_url(
get_test_storage_root()
)
pathlib = p._fs_client().pathlib # type: ignore[attr-defined]
assert p._fs_client().dataset_path.endswith(p.dataset_name + pathlib.sep)
assert p._fs_client().dataset_path.endswith(p.dataset_name)
Loading