Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions dlt/common/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import base64
import dataclasses
from datetime import date, datetime, time # noqa: I251
from typing import Any, Callable, List, Protocol, IO, Union, Dict
from typing import Any, Callable, List, Protocol, IO, Union, cast
from uuid import UUID
from enum import Enum

Expand All @@ -12,16 +12,17 @@
PydanticBaseModel = None # type: ignore[misc]

from dlt.common import known_env
from dlt.common.typing import TJsonValue
from dlt.common.exceptions import TypeErrorWithKnownTypes
from dlt.common.pendulum import pendulum
from dlt.common.arithmetics import Decimal
from dlt.common.wei import Wei
from dlt.common.utils import map_nested_values_in_place # noqa: F401
from dlt.common.utils import map_nested_values_in_place
from dlt.common.libs.hexbytes import HexBytes

TPuaDecoders = List[Callable[[Any], Any]]

JsonSerializable = Union[str, Dict[str, Any]]
JsonSerializable = TJsonValue
"""
Type representing a JSON-serializable object.
"""
Expand All @@ -39,6 +40,13 @@
This is used as a last-resort fallback for encoding objects.
"""

JSON_SCALAR_TYPES = (type(None), bool, int, float, str)
"""Python types representing JSON scalar values. Can be serialized natively without custom encoding."""
JSON_NESTED_TYPES = (dict, list, tuple)
"""Python types representing JSON nested values. Can be serialized natively without custom encoding."""
JSON_TYPES = JSON_SCALAR_TYPES + JSON_NESTED_TYPES
"""Python types representing all JSON values. Can be serialized natively without custom encoding."""


def _custom_encode(obj: Any) -> JsonSerializable:
"""Returns a JSON-serializable representation of `obj`"""
Expand Down Expand Up @@ -282,6 +290,28 @@ def set_custom_encoder_impl(encoder: JsonEncoder) -> None:
_custom_encoder = encoder


def to_json_value(value: object, encoder: JsonEncoder = custom_encode) -> TJsonValue:
"""Converts `value` to JSON-native Python value, using `encoder` for non-JSON-native values.

`dict` and `list` inputs are encoded in place, `tuple` inputs are converted to `list`.
"""

def _to_json_value(value: object) -> TJsonValue:
if isinstance(value, JSON_SCALAR_TYPES):
return cast(TJsonValue, value)
return cast(TJsonValue, map_nested_values_in_place(to_json_value, value, encoder=encoder))

if isinstance(value, JSON_TYPES):
return _to_json_value(value)

value = encoder(value)

if isinstance(value, JSON_TYPES):
return _to_json_value(value)

raise TypeError(f"`{repr(value)}` is not JSON serializable")


# pick the right impl
json: SupportsJson = None
if os.environ.get(known_env.DLT_USE_JSON) == "simplejson":
Expand All @@ -302,6 +332,7 @@ def set_custom_encoder_impl(encoder: JsonEncoder) -> None:
__all__ = [
"json",
"custom_encode",
"to_json_value",
"custom_pua_encode",
"custom_pua_decode",
"custom_pua_decode_nested",
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ def __str__(self) -> str:
return self.job_id()


def group_jobs_by_table_name(
jobs: Iterable[ParsedLoadJobFileName],
) -> dict[str, list[ParsedLoadJobFileName]]:
"""Returns dictionary with table names as keys and list of jobs for those tables as values."""

jobs_by_table_name: dict[str, list[ParsedLoadJobFileName]] = {}
for job in jobs:
jobs_by_table_name.setdefault(job.table_name, []).append(job)
return jobs_by_table_name


class LoadJobInfo(NamedTuple):
state: TPackageJobState
file_path: str
Expand Down
10 changes: 10 additions & 0 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PAST_TIMESTAMP: float = 0.0
FUTURE_TIMESTAMP: float = 9999999999.0
DAY_DURATION_SEC: float = 24 * 60 * 60.0
UNIX_EPOCH_DATE = datetime.date(1970, 1, 1)

precise_time: Callable[[], float] = None
"""A precise timer using win_precise_time library on windows and time.time on other systems"""
Expand Down Expand Up @@ -284,6 +285,11 @@ def datetime_obj_to_str(
return datatime.strftime(datetime_format)


def date_to_epoch_days(value: datetime.date) -> int:
"""Converts date value to number of days since Unix epoch."""
return value.toordinal() - UNIX_EPOCH_DATE.toordinal()


def ensure_pendulum_time(value: Union[str, int, float, datetime.time, timedelta]) -> pendulum.Time:
"""Coerce a time-like value to a `pendulum.Time` object using timezone=False semantics.

Expand Down Expand Up @@ -439,6 +445,10 @@ def datetime_to_timestamp_ms(moment: Union[datetime.datetime, pendulum.DateTime]
return int(moment.timestamp() * 1000)


def datetime_to_timestamp_us(moment: Union[datetime.datetime, pendulum.DateTime]) -> int:
return datetime_to_timestamp(moment) * 1_000_000 + moment.microsecond


def _datetime_from_ts_or_iso(
value: Union[int, float, str]
) -> Union[pendulum.DateTime, pendulum.Date, pendulum.Time]:
Expand Down
12 changes: 12 additions & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ class SecretSentinel:
"""A single data item as extracted from data source"""
TDataItems: TypeAlias = Union[TDataItem, List[TDataItem]]
"A single data item or a list as extracted from the data source"
TDataRecord = dict[str, Any]
"""Table row dictionary. Not guaranteed to be JSON serializable without custom encoding."""
TDataRecordBatch = list[TDataRecord]
"""List of table row dictionaries. Not guaranteed to be JSON serializable without custom encoding."""
TJsonScalar = Union[None, bool, int, float, str]
"""JSON-native scalar value. Can be serialized to JSON without custom encoding."""
TJsonValue = Union[TJsonScalar, list["TJsonValue"], dict[str, "TJsonValue"]]
"""JSON-native value. Can be serialized to JSON without custom encoding."""
TJsonDataRecord = dict[str, TJsonValue]
"""JSON-native Table row dictionary. Can be serialized to JSON without custom encoding."""
TJsonDataRecordBatch = list[TJsonDataRecord]
"""List of JSON-native table row dictionaries. Can be serialized to JSON without custom encoding."""
TAnyDateTime = Union[pendulum.DateTime, pendulum.Date, datetime, date, str, float, int]
"""DateTime represented as pendulum/python object, ISO string or unix timestamp"""
TTimeInterval = Tuple[datetime, datetime]
Expand Down
88 changes: 88 additions & 0 deletions dlt/destinations/file_batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Generic, Iterator, Sequence, Sized, TypeVar

if TYPE_CHECKING:
from dlt.common.libs.pyarrow import pyarrow

from dlt.common.json import json
from dlt.common.storages import FileStorage
from dlt.common.typing import TDataRecordBatch

TRecordBatch = TypeVar("TRecordBatch", bound=Sized)


class FileBatchIterator(ABC, Generic[TRecordBatch]):
def __init__(
self,
file_path: str,
batch_size: int,
record_offset: int,
columns: Sequence[str] = (),
) -> None:
self._file_path = file_path
self._batch_size = batch_size
self._record_offset = record_offset
self._columns = list(columns)

@abstractmethod
def __iter__(self) -> Iterator[TRecordBatch]:
pass


class ParquetFileBatchIterator(FileBatchIterator["pyarrow.RecordBatch"]):
def __init__(
self,
file_path: str,
batch_size: int,
record_offset: int,
columns: Sequence[str] = (),
) -> None:
super().__init__(file_path, batch_size, record_offset, columns)
self._batch_offset, remainder = divmod(self._record_offset, self._batch_size)
assert remainder == 0, "`_record_offset` must be a multiple of `_batch_size`"

def __iter__(self) -> Iterator[pyarrow.RecordBatch]:
from dlt.common.libs.pyarrow import pyarrow

batches_to_skip = self._batch_offset
with pyarrow.parquet.ParquetFile(self._file_path) as reader:
for record_batch in reader.iter_batches(
batch_size=self._batch_size,
columns=self._columns or None,
):
if batches_to_skip > 0:
batches_to_skip -= 1
continue
yield record_batch


class JsonlFileBatchIterator(FileBatchIterator[TDataRecordBatch]):
def __iter__(self) -> Iterator[TDataRecordBatch]:
current_batch: TDataRecordBatch = []
records_to_skip = self._record_offset
projected_columns = set(self._columns)

with FileStorage.open_zipsafe_ro(self._file_path) as f:
for line in f:
records = json.typed_loads(line)
if isinstance(records, dict):
records = [records]

for record in records:
if records_to_skip > 0:
records_to_skip -= 1
continue
if projected_columns:
record = {
key: value for key, value in record.items() if key in projected_columns
}
current_batch.append(record)
if len(current_batch) == self._batch_size:
yield current_batch
current_batch = []

# yield any remaining records in last partial batch
if current_batch:
yield current_batch
1 change: 0 additions & 1 deletion dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ def create_load_job(
self.config, # type: ignore
destination_state(),
_streaming_load, # type: ignore
[],
callable_requires_job_client_args=True,
)
else:
Expand Down
33 changes: 29 additions & 4 deletions dlt/destinations/impl/databricks/configuration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import dataclasses
from typing import ClassVar, Final, Optional, Any, Dict, List, List, Dict, cast, Callable
from typing import Any, Callable, ClassVar, Dict, Final, List, Optional, cast
from urllib.parse import urlparse

from dlt.common import logger
from dlt.common.configuration.specs.base_configuration import (
BaseConfiguration,
CredentialsConfiguration,
configspec,
)
from dlt.common.typing import TSecretStrValue
from dlt.common.configuration.specs.base_configuration import CredentialsConfiguration, configspec
from dlt.common.destination.client import DestinationClientDwhWithStagingConfiguration
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.common.utils import digest128
Expand Down Expand Up @@ -163,10 +167,30 @@ def to_connector_params(self) -> Dict[str, Any]:
conn_params["access_token"] = self.access_token
return conn_params

def to_workspace_url(self) -> str:
if not self.server_hostname:
raise ConfigurationValueError(
"Cannot construct workspace URL: `server_hostname` is not set."
)
return f"https://{self.server_hostname}"

def __str__(self) -> str:
return f"databricks://{self.server_hostname}{self.http_path}/{self.catalog}"


@configspec
class DatabricksZerobusCredentials(CredentialsConfiguration):
client_id: str = None
client_secret: TSecretStrValue = None


@configspec
class DatabricksZerobusConfiguration(BaseConfiguration):
endpoint_url: str = None
credentials: DatabricksZerobusCredentials = None
batch_size: int = 500


@configspec
class DatabricksClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = dataclasses.field(default="databricks", init=False, repr=False, compare=False) # type: ignore[misc]
Expand All @@ -179,9 +203,10 @@ class DatabricksClientConfiguration(DestinationClientDwhWithStagingConfiguration
"""Name of the Databricks managed volume for temporary storage, e.g., <catalog_name>.<database_name>.<volume_name>. Defaults to '_dlt_temp_load_volume' if not set."""
keep_staged_files: Optional[bool] = True
"""Tells if to keep the files in internal (volume) stage"""

"""Whether PRIMARY KEY or FOREIGN KEY constrains should be created"""
create_indexes: bool = False
"""Whether PRIMARY KEY or FOREIGN KEY constrains should be created"""
zerobus: Optional[DatabricksZerobusConfiguration] = None
"""Databricks Zerobus Configuration including endpoint and credentials. Required when using the `zerobus` insert API."""

def __str__(self) -> str:
"""Return displayable destination location"""
Expand Down
Loading
Loading