Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions python/python/lance/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ class Blob:
uri: Optional[str] = None
position: Optional[int] = None
size: Optional[int] = None
source_id: Optional[str] = None

def __post_init__(self) -> None:
if self.data is not None and self.uri is not None:
raise ValueError("Blob cannot have both data and uri")
if self.uri == "":
raise ValueError("Blob uri cannot be empty")
if self.source_id == "":
raise ValueError("Blob source_id cannot be empty")
if self.source_id is not None and self.data is None and self.uri is None:
raise ValueError("Blob source_id cannot be set without data or uri")
if (self.position is not None or self.size is not None) and self.uri is None:
raise ValueError("External packed blob must have a uri")
if (self.position is None) != (self.size is None):
Expand All @@ -50,18 +55,23 @@ def __post_init__(self) -> None:
)

@staticmethod
def from_bytes(data: Union[bytes, bytearray, memoryview]) -> "Blob":
return Blob(data=bytes(data))
def from_bytes(
data: Union[bytes, bytearray, memoryview], source_id: Optional[str] = None
) -> "Blob":
return Blob(data=bytes(data), source_id=source_id)

@staticmethod
def from_uri(
uri: str, position: Optional[int] = None, size: Optional[int] = None
uri: str,
position: Optional[int] = None,
size: Optional[int] = None,
source_id: Optional[str] = None,
) -> "Blob":
if uri == "":
raise ValueError("Blob uri cannot be empty")
if (position is not None and position < 0) or (size is not None and size < 0):
raise ValueError("External blob position and size must be non-negative")
return Blob(uri=uri, position=position, size=size)
return Blob(uri=uri, position=position, size=size, source_id=source_id)

@staticmethod
def empty() -> "Blob":
Expand All @@ -76,15 +86,17 @@ class BlobType(pa.ExtensionType):
descriptor format, and reads will return descriptors by default.
"""

def __init__(self) -> None:
storage_type = pa.struct(
[
pa.field("data", pa.large_binary(), nullable=True),
pa.field("uri", pa.utf8(), nullable=True),
pa.field("position", pa.uint64(), nullable=True),
pa.field("size", pa.uint64(), nullable=True),
]
)
def __init__(self, storage_type: Optional[pa.DataType] = None) -> None:
if storage_type is None:
storage_type = pa.struct(
[
pa.field("data", pa.large_binary(), nullable=True),
pa.field("uri", pa.utf8(), nullable=True),
pa.field("position", pa.uint64(), nullable=True),
pa.field("size", pa.uint64(), nullable=True),
pa.field("source_id", pa.utf8(), nullable=True),
]
)
pa.ExtensionType.__init__(self, storage_type, "lance.blob.v2")

def __arrow_ext_serialize__(self) -> bytes:
Expand All @@ -94,7 +106,7 @@ def __arrow_ext_serialize__(self) -> bytes:
def __arrow_ext_deserialize__(
cls, storage_type: pa.DataType, serialized: bytes
) -> "BlobType":
return BlobType()
return BlobType(storage_type)

def __arrow_ext_class__(self):
return BlobArray
Expand Down Expand Up @@ -128,6 +140,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values: list[Optional[str]] = []
position_values: list[Optional[int]] = []
size_values: list[Optional[int]] = []
source_id_values: list[Optional[str]] = []
null_mask: list[bool] = []

for v in values:
Expand All @@ -136,6 +149,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values.append(None)
position_values.append(None)
size_values.append(None)
source_id_values.append(None)
null_mask.append(True)
continue

Expand All @@ -144,6 +158,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values.append(v.uri)
position_values.append(v.position)
size_values.append(v.size)
source_id_values.append(v.source_id)
null_mask.append(False)
continue

Expand All @@ -154,6 +169,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values.append(v)
position_values.append(None)
size_values.append(None)
source_id_values.append(None)
null_mask.append(False)
continue

Expand All @@ -162,6 +178,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values.append(None)
position_values.append(None)
size_values.append(None)
source_id_values.append(None)
null_mask.append(False)
continue

Expand All @@ -174,10 +191,11 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_arr = pa.array(uri_values, type=pa.utf8())
position_arr = pa.array(position_values, type=pa.uint64())
size_arr = pa.array(size_values, type=pa.uint64())
source_id_arr = pa.array(source_id_values, type=pa.utf8())
mask_arr = pa.array(null_mask, type=pa.bool_())
storage = pa.StructArray.from_arrays(
[data_arr, uri_arr, position_arr, size_arr],
names=["data", "uri", "position", "size"],
[data_arr, uri_arr, position_arr, size_arr, source_id_arr],
names=["data", "uri", "position", "size", "source_id"],
mask=mask_arr,
)
return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value]
Expand Down
187 changes: 186 additions & 1 deletion python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pyarrow as pa
import pytest
from lance import Blob, BlobColumn, BlobFile, DatasetBasePath
from lance.blob import BlobType
from lance.fragment import write_fragments

lance_dataset_module = importlib.import_module("lance.dataset")
Expand All @@ -39,6 +40,13 @@ def _commit_blob_fragments(dataset_uri, schema, fragments, initial_bases=None):
return lance.LanceDataset.commit(dataset_uri, operation)


def _field_child_names(field):
data_type = field.type
if isinstance(data_type, pa.ExtensionType):
data_type = data_type.storage_type
return [child.name for child in data_type]


def _external_blob_table(blob_path, payload=b"hello"):
blob_path.parent.mkdir(parents=True, exist_ok=True)
blob_path.write_bytes(payload)
Expand Down Expand Up @@ -764,11 +772,85 @@ def test_blob_extension_write_external(tmp_path):
],
)
def test_blob_from_uri_accepts_optional_slice_metadata(position, size):
blob = Blob.from_uri("file:///tmp/blob.bin", position=position, size=size)
blob = Blob.from_uri(
"file:///tmp/blob.bin",
position=position,
size=size,
source_id="image:1",
)

assert blob.uri == "file:///tmp/blob.bin"
assert blob.position == position
assert blob.size == size
assert blob.source_id == "image:1"


def test_blob_rejects_invalid_source_id():
with pytest.raises(ValueError, match="source_id cannot be empty"):
Blob.from_bytes(b"hello", source_id="")

with pytest.raises(ValueError, match="source_id cannot be set without data or uri"):
Blob(source_id="image:1")


def test_blob_type_ipc_round_trip_accepts_new_and_old_storage():
new_table = pa.table(
{"blob": lance.blob_array([Blob.from_bytes(b"hi", source_id="s1")])}
)
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, new_table.schema) as writer:
writer.write_table(new_table)
restored = pa.ipc.open_stream(sink.getvalue()).read_all()
assert len(restored.schema.field("blob").type.storage_type) == 5

old_storage_type = pa.struct(
[
pa.field("data", pa.large_binary(), nullable=True),
pa.field("uri", pa.utf8(), nullable=True),
pa.field("position", pa.uint64(), nullable=True),
pa.field("size", pa.uint64(), nullable=True),
]
)
old_storage = pa.StructArray.from_arrays(
[
pa.array([b"hi"], type=pa.large_binary()),
pa.array([None], type=pa.utf8()),
pa.array([None], type=pa.uint64()),
pa.array([None], type=pa.uint64()),
],
names=["data", "uri", "position", "size"],
)
old_ext = pa.ExtensionArray.from_storage(BlobType(old_storage_type), old_storage)
old_table = pa.table({"blob": old_ext})
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, old_table.schema) as writer:
writer.write_table(old_table)
restored = pa.ipc.open_stream(sink.getvalue()).read_all()
assert len(restored.schema.field("blob").type.storage_type) == 4


def test_blob_extension_write_rejects_source_id_without_payload(tmp_path):
storage = pa.StructArray.from_arrays(
[
pa.array([None], type=pa.large_binary()),
pa.array([None], type=pa.utf8()),
pa.array([None], type=pa.uint64()),
pa.array([None], type=pa.uint64()),
pa.array(["image:1"], type=pa.utf8()),
],
names=["data", "uri", "position", "size", "source_id"],
)
blob_array = pa.ExtensionArray.from_storage(BlobType(), storage)
table = pa.table({"blob": blob_array})

with pytest.raises(
OSError, match="source_id cannot be set on a row without data or uri"
):
lance.write_dataset(
table,
tmp_path / "test_ds_v2_source_id_without_payload",
data_storage_version="2.2",
)


def test_blob_extension_write_external_ingest(tmp_path):
Expand All @@ -792,6 +874,109 @@ def test_blob_extension_write_external_ingest(tmp_path):
assert f.read() == b"hello"


def test_blob_extension_write_source_id_shares_packed_descriptor(tmp_path):
payload = b"x" * (64 * 1024 + 1)
table = pa.table(
{
"blob": lance.blob_array(
[
Blob.from_bytes(payload, source_id="image:1"),
Blob.from_bytes(payload, source_id="image:1"),
Blob.from_bytes(payload),
]
)
}
)
ds = lance.write_dataset(
table,
tmp_path / "test_ds_v2_source_id",
data_storage_version="2.2",
)

assert "source_id" not in _field_child_names(ds.schema.field("blob"))

desc = ds.to_table(columns=["blob"]).column("blob").chunk(0)
assert "source_id" not in _field_child_names(
ds.to_table(columns=["blob"]).schema.field("blob")
)
assert desc.field("kind")[0].as_py() == desc.field("kind")[1].as_py()
assert desc.field("blob_id")[0].as_py() == desc.field("blob_id")[1].as_py()
assert desc.field("position")[0].as_py() == desc.field("position")[1].as_py()
assert desc.field("size")[0].as_py() == desc.field("size")[1].as_py()
assert desc.field("position")[1].as_py() != desc.field("position")[2].as_py()

blobs = ds.take_blobs("blob", indices=[0, 1, 2])
assert [blob.read() for blob in blobs] == [payload, payload, payload]


def test_blob_extension_write_source_id_size_mismatch_rejected(tmp_path):
table = pa.table(
{
"blob": lance.blob_array(
[
Blob.from_bytes(b"x" * (64 * 1024 + 1), source_id="image:1"),
Blob.from_bytes(b"x" * (64 * 1024 + 2), source_id="image:1"),
]
)
}
)

with pytest.raises(OSError, match="source_id 'image:1'.*first size=65537"):
lance.write_dataset(
table,
tmp_path / "test_ds_v2_source_id_mismatch",
data_storage_version="2.2",
)


def test_blob_extension_write_source_id_inline_noop(tmp_path):
table = pa.table(
{
"blob": lance.blob_array(
[
Blob.from_bytes(b"a", source_id="image:1"),
Blob.from_bytes(b"bb", source_id="image:1"),
]
)
}
)
ds = lance.write_dataset(
table,
tmp_path / "test_ds_v2_source_id_inline",
data_storage_version="2.2",
)

blobs = ds.take_blobs("blob", indices=[0, 1])
assert [blob.read() for blob in blobs] == [b"a", b"bb"]


def test_blob_extension_write_external_ingest_implicit_source_id_shares(
tmp_path,
):
payload = b"u" * (64 * 1024 + 1)
blob_path = tmp_path / "external_blob.bin"
blob_path.write_bytes(payload)
uri = blob_path.as_uri()

table = pa.table(
{"blob": lance.blob_array([Blob.from_uri(uri), Blob.from_uri(uri)])}
)
ds = lance.write_dataset(
table,
tmp_path / "test_ds_v2_external_ingest_source_id",
data_storage_version="2.2",
external_blob_mode="ingest",
)

desc = ds.to_table(columns=["blob"]).column("blob").chunk(0)
assert desc.field("blob_id")[0].as_py() == desc.field("blob_id")[1].as_py()
assert desc.field("position")[0].as_py() == desc.field("position")[1].as_py()

blob_path.unlink()
blobs = ds.take_blobs("blob", indices=[0, 1])
assert [blob.read() for blob in blobs] == [payload, payload]


def test_blob_extension_write_external_ingest_rejects_reference_only_options(tmp_path):
blob_path = tmp_path / "external_blob.bin"
blob_path.write_bytes(b"hello")
Expand Down
Loading
Loading