From 60e21b1144c4b4b89cae1256b0f4a83b10ec6f97 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 24 Jun 2026 18:25:33 +0800 Subject: [PATCH] feat: add blob v2 source id sharing --- python/python/lance/blob.py | 50 +- python/python/tests/test_blob.py | 187 +++++- rust/lance-core/src/datatypes/field.rs | 41 +- rust/lance/src/blob.rs | 289 ++++++++-- rust/lance/src/dataset/blob.rs | 765 +++++++++++++++++++++---- 5 files changed, 1144 insertions(+), 188 deletions(-) diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index 46faf760cdd..34c16223cff 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -25,12 +25,17 @@ class Blob: uri: Optional[str] = None position: Optional[int] = None size: Optional[int] = None + source_id: Optional[str] = None def __post_init__(self) -> None: if self.data is not None and self.uri is not None: raise ValueError("Blob cannot have both data and uri") if self.uri == "": raise ValueError("Blob uri cannot be empty") + if self.source_id == "": + raise ValueError("Blob source_id cannot be empty") + if self.source_id is not None and self.data is None and self.uri is None: + raise ValueError("Blob source_id cannot be set without data or uri") if (self.position is not None or self.size is not None) and self.uri is None: raise ValueError("External packed blob must have a uri") if (self.position is None) != (self.size is None): @@ -43,18 +48,23 @@ def __post_init__(self) -> None: ) @staticmethod - def from_bytes(data: Union[bytes, bytearray, memoryview]) -> "Blob": - return Blob(data=bytes(data)) + def from_bytes( + data: Union[bytes, bytearray, memoryview], source_id: Optional[str] = None + ) -> "Blob": + return Blob(data=bytes(data), source_id=source_id) @staticmethod def from_uri( - uri: str, position: Optional[int] = None, size: Optional[int] = None + uri: str, + position: Optional[int] = None, + size: Optional[int] = None, + source_id: Optional[str] = None, ) -> "Blob": if uri == "": raise ValueError("Blob uri cannot be empty") if (position is not None and position < 0) or (size is not None and size < 0): raise ValueError("External blob position and size must be non-negative") - return Blob(uri=uri, position=position, size=size) + return Blob(uri=uri, position=position, size=size, source_id=source_id) @staticmethod def empty() -> "Blob": @@ -69,15 +79,17 @@ class BlobType(pa.ExtensionType): descriptor format, and reads will return descriptors by default. """ - def __init__(self) -> None: - storage_type = pa.struct( - [ - pa.field("data", pa.large_binary(), nullable=True), - pa.field("uri", pa.utf8(), nullable=True), - pa.field("position", pa.uint64(), nullable=True), - pa.field("size", pa.uint64(), nullable=True), - ] - ) + def __init__(self, storage_type: Optional[pa.DataType] = None) -> None: + if storage_type is None: + storage_type = pa.struct( + [ + pa.field("data", pa.large_binary(), nullable=True), + pa.field("uri", pa.utf8(), nullable=True), + pa.field("position", pa.uint64(), nullable=True), + pa.field("size", pa.uint64(), nullable=True), + pa.field("source_id", pa.utf8(), nullable=True), + ] + ) pa.ExtensionType.__init__(self, storage_type, "lance.blob.v2") def __arrow_ext_serialize__(self) -> bytes: @@ -87,7 +99,7 @@ def __arrow_ext_serialize__(self) -> bytes: def __arrow_ext_deserialize__( cls, storage_type: pa.DataType, serialized: bytes ) -> "BlobType": - return BlobType() + return BlobType(storage_type) def __arrow_ext_class__(self): return BlobArray @@ -121,6 +133,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values: list[Optional[str]] = [] position_values: list[Optional[int]] = [] size_values: list[Optional[int]] = [] + source_id_values: list[Optional[str]] = [] null_mask: list[bool] = [] for v in values: @@ -129,6 +142,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values.append(None) position_values.append(None) size_values.append(None) + source_id_values.append(None) null_mask.append(True) continue @@ -137,6 +151,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values.append(v.uri) position_values.append(v.position) size_values.append(v.size) + source_id_values.append(v.source_id) null_mask.append(False) continue @@ -147,6 +162,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values.append(v) position_values.append(None) size_values.append(None) + source_id_values.append(None) null_mask.append(False) continue @@ -155,6 +171,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values.append(None) position_values.append(None) size_values.append(None) + source_id_values.append(None) null_mask.append(False) continue @@ -167,10 +184,11 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_arr = pa.array(uri_values, type=pa.utf8()) position_arr = pa.array(position_values, type=pa.uint64()) size_arr = pa.array(size_values, type=pa.uint64()) + source_id_arr = pa.array(source_id_values, type=pa.utf8()) mask_arr = pa.array(null_mask, type=pa.bool_()) storage = pa.StructArray.from_arrays( - [data_arr, uri_arr, position_arr, size_arr], - names=["data", "uri", "position", "size"], + [data_arr, uri_arr, position_arr, size_arr, source_id_arr], + names=["data", "uri", "position", "size", "source_id"], mask=mask_arr, ) return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value] diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 5a896d21c5d..be01fc16273 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -13,6 +13,7 @@ import pyarrow as pa import pytest from lance import Blob, BlobColumn, BlobFile, DatasetBasePath +from lance.blob import BlobType from lance.fragment import write_fragments lance_dataset_module = importlib.import_module("lance.dataset") @@ -39,6 +40,13 @@ def _commit_blob_fragments(dataset_uri, schema, fragments, initial_bases=None): return lance.LanceDataset.commit(dataset_uri, operation) +def _field_child_names(field): + data_type = field.type + if isinstance(data_type, pa.ExtensionType): + data_type = data_type.storage_type + return [child.name for child in data_type] + + def _external_blob_table(blob_path, payload=b"hello"): blob_path.parent.mkdir(parents=True, exist_ok=True) blob_path.write_bytes(payload) @@ -560,11 +568,85 @@ def test_blob_extension_write_external(tmp_path): ], ) def test_blob_from_uri_accepts_optional_slice_metadata(position, size): - blob = Blob.from_uri("file:///tmp/blob.bin", position=position, size=size) + blob = Blob.from_uri( + "file:///tmp/blob.bin", + position=position, + size=size, + source_id="image:1", + ) assert blob.uri == "file:///tmp/blob.bin" assert blob.position == position assert blob.size == size + assert blob.source_id == "image:1" + + +def test_blob_rejects_invalid_source_id(): + with pytest.raises(ValueError, match="source_id cannot be empty"): + Blob.from_bytes(b"hello", source_id="") + + with pytest.raises(ValueError, match="source_id cannot be set without data or uri"): + Blob(source_id="image:1") + + +def test_blob_type_ipc_round_trip_accepts_new_and_old_storage(): + new_table = pa.table( + {"blob": lance.blob_array([Blob.from_bytes(b"hi", source_id="s1")])} + ) + sink = pa.BufferOutputStream() + with pa.ipc.new_stream(sink, new_table.schema) as writer: + writer.write_table(new_table) + restored = pa.ipc.open_stream(sink.getvalue()).read_all() + assert len(restored.schema.field("blob").type.storage_type) == 5 + + old_storage_type = pa.struct( + [ + pa.field("data", pa.large_binary(), nullable=True), + pa.field("uri", pa.utf8(), nullable=True), + pa.field("position", pa.uint64(), nullable=True), + pa.field("size", pa.uint64(), nullable=True), + ] + ) + old_storage = pa.StructArray.from_arrays( + [ + pa.array([b"hi"], type=pa.large_binary()), + pa.array([None], type=pa.utf8()), + pa.array([None], type=pa.uint64()), + pa.array([None], type=pa.uint64()), + ], + names=["data", "uri", "position", "size"], + ) + old_ext = pa.ExtensionArray.from_storage(BlobType(old_storage_type), old_storage) + old_table = pa.table({"blob": old_ext}) + sink = pa.BufferOutputStream() + with pa.ipc.new_stream(sink, old_table.schema) as writer: + writer.write_table(old_table) + restored = pa.ipc.open_stream(sink.getvalue()).read_all() + assert len(restored.schema.field("blob").type.storage_type) == 4 + + +def test_blob_extension_write_rejects_source_id_without_payload(tmp_path): + storage = pa.StructArray.from_arrays( + [ + pa.array([None], type=pa.large_binary()), + pa.array([None], type=pa.utf8()), + pa.array([None], type=pa.uint64()), + pa.array([None], type=pa.uint64()), + pa.array(["image:1"], type=pa.utf8()), + ], + names=["data", "uri", "position", "size", "source_id"], + ) + blob_array = pa.ExtensionArray.from_storage(BlobType(), storage) + table = pa.table({"blob": blob_array}) + + with pytest.raises( + OSError, match="source_id cannot be set on a row without data or uri" + ): + lance.write_dataset( + table, + tmp_path / "test_ds_v2_source_id_without_payload", + data_storage_version="2.2", + ) def test_blob_extension_write_external_ingest(tmp_path): @@ -588,6 +670,109 @@ def test_blob_extension_write_external_ingest(tmp_path): assert f.read() == b"hello" +def test_blob_extension_write_source_id_shares_packed_descriptor(tmp_path): + payload = b"x" * (64 * 1024 + 1) + table = pa.table( + { + "blob": lance.blob_array( + [ + Blob.from_bytes(payload, source_id="image:1"), + Blob.from_bytes(payload, source_id="image:1"), + Blob.from_bytes(payload), + ] + ) + } + ) + ds = lance.write_dataset( + table, + tmp_path / "test_ds_v2_source_id", + data_storage_version="2.2", + ) + + assert "source_id" not in _field_child_names(ds.schema.field("blob")) + + desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) + assert "source_id" not in _field_child_names( + ds.to_table(columns=["blob"]).schema.field("blob") + ) + assert desc.field("kind")[0].as_py() == desc.field("kind")[1].as_py() + assert desc.field("blob_id")[0].as_py() == desc.field("blob_id")[1].as_py() + assert desc.field("position")[0].as_py() == desc.field("position")[1].as_py() + assert desc.field("size")[0].as_py() == desc.field("size")[1].as_py() + assert desc.field("position")[1].as_py() != desc.field("position")[2].as_py() + + blobs = ds.take_blobs("blob", indices=[0, 1, 2]) + assert [blob.read() for blob in blobs] == [payload, payload, payload] + + +def test_blob_extension_write_source_id_size_mismatch_rejected(tmp_path): + table = pa.table( + { + "blob": lance.blob_array( + [ + Blob.from_bytes(b"x" * (64 * 1024 + 1), source_id="image:1"), + Blob.from_bytes(b"x" * (64 * 1024 + 2), source_id="image:1"), + ] + ) + } + ) + + with pytest.raises(OSError, match="source_id 'image:1'.*first size=65537"): + lance.write_dataset( + table, + tmp_path / "test_ds_v2_source_id_mismatch", + data_storage_version="2.2", + ) + + +def test_blob_extension_write_source_id_inline_noop(tmp_path): + table = pa.table( + { + "blob": lance.blob_array( + [ + Blob.from_bytes(b"a", source_id="image:1"), + Blob.from_bytes(b"bb", source_id="image:1"), + ] + ) + } + ) + ds = lance.write_dataset( + table, + tmp_path / "test_ds_v2_source_id_inline", + data_storage_version="2.2", + ) + + blobs = ds.take_blobs("blob", indices=[0, 1]) + assert [blob.read() for blob in blobs] == [b"a", b"bb"] + + +def test_blob_extension_write_external_ingest_implicit_source_id_shares( + tmp_path, +): + payload = b"u" * (64 * 1024 + 1) + blob_path = tmp_path / "external_blob.bin" + blob_path.write_bytes(payload) + uri = blob_path.as_uri() + + table = pa.table( + {"blob": lance.blob_array([Blob.from_uri(uri), Blob.from_uri(uri)])} + ) + ds = lance.write_dataset( + table, + tmp_path / "test_ds_v2_external_ingest_source_id", + data_storage_version="2.2", + external_blob_mode="ingest", + ) + + desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) + assert desc.field("blob_id")[0].as_py() == desc.field("blob_id")[1].as_py() + assert desc.field("position")[0].as_py() == desc.field("position")[1].as_py() + + blob_path.unlink() + blobs = ds.take_blobs("blob", indices=[0, 1]) + assert [blob.read() for blob in blobs] == [payload, payload] + + def test_blob_extension_write_external_ingest_rejects_reference_only_options(tmp_path): blob_path = tmp_path / "external_blob.bin" blob_path.write_bytes(b"hello") diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 4c2665a3640..26e0b150feb 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -66,6 +66,10 @@ fn has_blob_v2_extension(field: &ArrowField) -> bool { .unwrap_or(false) } +fn is_known_blob_v2_write_field(field: &ArrowField) -> bool { + matches!(field.name().as_str(), "data" | "uri" | "position" | "size") +} + #[derive(Debug, Default)] pub enum NullabilityComparison { // If the nullabilities don't match then the fields don't match @@ -1088,10 +1092,12 @@ impl TryFrom<&ArrowField> for Field { .max(-1), None => -1, }; + let is_blob_v2 = has_blob_v2_extension(field); let children = match field.data_type() { DataType::Struct(children) => children .iter() + .filter(|f| !is_blob_v2 || is_known_blob_v2_write_field(f.as_ref())) .map(|f| Self::try_from(f.as_ref())) .collect::>()?, DataType::List(item) => vec![Self::try_from(item.as_ref())?], @@ -1143,8 +1149,6 @@ impl TryFrom<&ArrowField> for Field { let unenforced_clustering_key_position = metadata .get(LANCE_UNENFORCED_CLUSTERING_KEY_POSITION) .and_then(|s| s.parse::().ok()); - let is_blob_v2 = has_blob_v2_extension(field); - if is_blob_v2 { metadata .entry(ARROW_EXT_NAME_KEY.to_string()) @@ -1864,6 +1868,39 @@ mod tests { assert_eq!(field.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } + #[test] + fn blob_v2_schema_conversion_filters_write_only_children() { + let metadata = + HashMap::from([(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())]); + let field: Field = ArrowField::new( + "blob", + DataType::Struct( + vec![ + ArrowField::new("data", DataType::LargeBinary, true), + ArrowField::new("uri", DataType::Utf8, true), + ArrowField::new("position", DataType::UInt64, true), + ArrowField::new("size", DataType::UInt64, true), + ArrowField::new("source_id", DataType::Utf8, true), + ArrowField::new("unknown", DataType::Utf8, true), + ] + .into(), + ), + true, + ) + .with_metadata(metadata) + .try_into() + .unwrap(); + + assert_eq!( + field + .children + .iter() + .map(|child| child.name.as_str()) + .collect::>(), + vec!["data", "uri", "position", "size"] + ); + } + #[test] fn project_by_field_accepts_blob_descriptor_projection() { let metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]); diff --git a/rust/lance/src/blob.rs b/rust/lance/src/blob.rs index 322bf67a04c..025b64d7f6e 100644 --- a/rust/lance/src/blob.rs +++ b/rust/lance/src/blob.rs @@ -3,13 +3,18 @@ //! Convenience builders for Lance blob v2 input columns. //! -//! Blob v2 expects a column shaped as `Struct` and -//! tagged with `ARROW:extension:name = "lance.blob.v2"`. This module offers a -//! type-safe builder to construct that struct without manually wiring metadata +//! Blob v2 expects a struct column tagged with +//! `ARROW:extension:name = "lance.blob.v2"`. Child fields are recognized by name. +//! This module offers a type-safe builder to construct that struct without +//! manually wiring metadata. use std::sync::Arc; -use arrow_array::{ArrayRef, StructArray, builder::LargeBinaryBuilder, builder::StringBuilder}; +use arrow_array::{ + ArrayRef, StructArray, + builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}, + types::UInt64Type, +}; use arrow_buffer::NullBufferBuilder; use arrow_schema::{DataType, Field}; use lance_arrow::{ARROW_EXT_NAME_KEY, BLOB_V2_EXT_NAME}; @@ -18,24 +23,10 @@ use crate::{Error, Result}; /// Construct the Arrow field for a blob v2 column. /// -/// Blob v2 expects a column shaped as `Struct` and -/// tagged with `ARROW:extension:name = "lance.blob.v2"`. +/// The default Rust schema preserves the historical minimal shape: +/// `Struct`. pub fn blob_field(name: &str, nullable: bool) -> Field { - let metadata = [(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())] - .into_iter() - .collect(); - Field::new( - name, - DataType::Struct( - vec![ - Field::new("data", DataType::LargeBinary, true), - Field::new("uri", DataType::Utf8, true), - ] - .into(), - ), - nullable, - ) - .with_metadata(metadata) + blob_field_with_children(name, nullable, false, false) } /// Builder for blob v2 input struct columns. @@ -44,9 +35,14 @@ pub fn blob_field(name: &str, nullable: bool) -> Field { pub struct BlobArrayBuilder { data_builder: LargeBinaryBuilder, uri_builder: StringBuilder, + position_builder: PrimitiveBuilder, + size_builder: PrimitiveBuilder, + source_id_builder: StringBuilder, validity: NullBufferBuilder, expected_len: usize, len: usize, + has_position_size: bool, + has_source_id: bool, } impl BlobArrayBuilder { @@ -55,34 +51,86 @@ impl BlobArrayBuilder { Self { data_builder: LargeBinaryBuilder::with_capacity(capacity, 0), uri_builder: StringBuilder::with_capacity(capacity, 0), + position_builder: PrimitiveBuilder::::with_capacity(capacity), + size_builder: PrimitiveBuilder::::with_capacity(capacity), + source_id_builder: StringBuilder::with_capacity(capacity, 0), validity: NullBufferBuilder::new(capacity), expected_len: capacity, len: 0, + has_position_size: false, + has_source_id: false, } } + /// Construct an Arrow field matching the shape this builder will produce. + /// + /// Use this instead of [`blob_field`] when constructing arrays with + /// source identity or URI slice metadata. + pub fn field(&self, name: &str, nullable: bool) -> Field { + blob_field_with_children( + name, + nullable, + self.has_position_size || self.has_source_id, + self.has_source_id, + ) + } + /// Append a blob backed by raw bytes. pub fn push_bytes(&mut self, bytes: impl AsRef<[u8]>) -> Result<()> { self.ensure_capacity()?; - self.validity.append_non_null(); - self.data_builder.append_value(bytes); - self.uri_builder.append_null(); - self.len += 1; - Ok(()) + self.append_bytes(bytes.as_ref(), None) + } + + /// Append a blob backed by raw bytes and a user-provided source identity. + /// + /// Rows with the same `source_id` may share the same Lance-owned packed or + /// dedicated descriptor within one data file. Inline blobs ignore `source_id`. + pub fn push_bytes_with_source_id( + &mut self, + source_id: impl Into, + bytes: impl AsRef<[u8]>, + ) -> Result<()> { + self.ensure_capacity()?; + let source_id = source_id.into(); + validate_source_id(&source_id)?; + self.append_bytes(bytes.as_ref(), Some(&source_id)) } /// Append a blob referenced by URI. pub fn push_uri(&mut self, uri: impl Into) -> Result<()> { self.ensure_capacity()?; let uri = uri.into(); - if uri.is_empty() { - return Err(Error::invalid_input("URI cannot be empty")); - } - self.validity.append_non_null(); - self.data_builder.append_null(); - self.uri_builder.append_value(uri); - self.len += 1; - Ok(()) + self.append_uri(&uri, None, None, None) + } + + /// Append a sliced blob referenced by URI. + pub fn push_uri_with_slice( + &mut self, + uri: impl Into, + position: u64, + size: u64, + ) -> Result<()> { + self.ensure_capacity()?; + let uri = uri.into(); + self.append_uri(&uri, Some(position), Some(size), None) + } + + /// Append a URI blob and a user-provided source identity. + /// + /// In ingest mode, rows with the same `source_id` may share the same + /// Lance-owned packed or dedicated descriptor within one data file. + pub fn push_uri_with_source_id( + &mut self, + source_id: impl Into, + uri: impl Into, + position: Option, + size: Option, + ) -> Result<()> { + self.ensure_capacity()?; + let source_id = source_id.into(); + validate_source_id(&source_id)?; + let uri = uri.into(); + self.append_uri(&uri, position, size, Some(&source_id)) } /// Append an empty blob (inline, zero-length payload). @@ -91,6 +139,9 @@ impl BlobArrayBuilder { self.validity.append_non_null(); self.data_builder.append_value([]); self.uri_builder.append_null(); + self.position_builder.append_null(); + self.size_builder.append_null(); + self.source_id_builder.append_null(); self.len += 1; Ok(()) } @@ -101,6 +152,9 @@ impl BlobArrayBuilder { self.validity.append_null(); self.data_builder.append_null(); self.uri_builder.append_null(); + self.position_builder.append_null(); + self.size_builder.append_null(); + self.source_id_builder.append_null(); self.len += 1; Ok(()) } @@ -116,21 +170,90 @@ impl BlobArrayBuilder { let data = Arc::new(self.data_builder.finish()); let uri = Arc::new(self.uri_builder.finish()); + let position = Arc::new(self.position_builder.finish()); + let size = Arc::new(self.size_builder.finish()); + let source_id = Arc::new(self.source_id_builder.finish()); let validity = self.validity.finish(); + let include_position_size = self.has_position_size || self.has_source_id; - let struct_array = StructArray::try_new( - vec![ - Field::new("data", DataType::LargeBinary, true), - Field::new("uri", DataType::Utf8, true), - ] - .into(), - vec![data as ArrayRef, uri as ArrayRef], - validity, - )?; + let mut fields = vec![ + Field::new("data", DataType::LargeBinary, true), + Field::new("uri", DataType::Utf8, true), + ]; + let mut arrays = vec![data as ArrayRef, uri as ArrayRef]; + if include_position_size { + fields.push(Field::new("position", DataType::UInt64, true)); + fields.push(Field::new("size", DataType::UInt64, true)); + arrays.push(position as ArrayRef); + arrays.push(size as ArrayRef); + } + if self.has_source_id { + fields.push(Field::new("source_id", DataType::Utf8, true)); + arrays.push(source_id as ArrayRef); + } + + let struct_array = StructArray::try_new(fields.into(), arrays, validity)?; Ok(Arc::new(struct_array)) } + fn append_bytes(&mut self, bytes: &[u8], source_id: Option<&str>) -> Result<()> { + self.validity.append_non_null(); + self.data_builder.append_value(bytes); + self.uri_builder.append_null(); + self.position_builder.append_null(); + self.size_builder.append_null(); + match source_id { + Some(source_id) => { + self.has_source_id = true; + self.source_id_builder.append_value(source_id); + } + None => self.source_id_builder.append_null(), + } + self.len += 1; + Ok(()) + } + + fn append_uri( + &mut self, + uri: &str, + position: Option, + size: Option, + source_id: Option<&str>, + ) -> Result<()> { + if uri.is_empty() { + return Err(Error::invalid_input("URI cannot be empty")); + } + if position.is_some() != size.is_some() { + return Err(Error::invalid_input( + "URI blob must set both position and size, or neither", + )); + } + self.validity.append_non_null(); + self.data_builder.append_null(); + self.uri_builder.append_value(uri); + match position { + Some(position) => { + self.has_position_size = true; + self.position_builder.append_value(position); + } + None => self.position_builder.append_null(), + } + match size { + Some(size) => self.size_builder.append_value(size), + None => self.size_builder.append_null(), + } + match source_id { + Some(source_id) => { + self.has_source_id = true; + self.source_id_builder.append_value(source_id); + } + None => self.source_id_builder.append_null(), + } + self.len += 1; + Ok(()) + } + fn ensure_capacity(&self) -> Result<()> { if self.len >= self.expected_len { Err(Error::invalid_input("BlobArrayBuilder capacity exceeded")) @@ -140,6 +263,38 @@ impl BlobArrayBuilder { } } +fn blob_field_with_children( + name: &str, + nullable: bool, + include_position_size: bool, + include_source_id: bool, +) -> Field { + let metadata = [(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())] + .into_iter() + .collect(); + let mut fields = vec![ + Field::new("data", DataType::LargeBinary, true), + Field::new("uri", DataType::Utf8, true), + ]; + if include_position_size { + fields.push(Field::new("position", DataType::UInt64, true)); + fields.push(Field::new("size", DataType::UInt64, true)); + } + if include_source_id { + fields.push(Field::new("source_id", DataType::Utf8, true)); + } + + Field::new(name, DataType::Struct(fields.into()), nullable).with_metadata(metadata) +} + +fn validate_source_id(source_id: &str) -> Result<()> { + if source_id.is_empty() { + Err(Error::invalid_input("source_id cannot be empty")) + } else { + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -154,6 +309,12 @@ mod tests { field.metadata().get(ARROW_EXT_NAME_KEY).unwrap(), BLOB_V2_EXT_NAME ); + let DataType::Struct(fields) = field.data_type() else { + panic!("expected struct blob field"); + }; + assert_eq!(fields.len(), 2); + assert_eq!(fields[0].name(), "data"); + assert_eq!(fields[1].name(), "uri"); } #[test] @@ -169,6 +330,7 @@ mod tests { assert_eq!(arr.null_count(), 1); let struct_arr = arr.as_struct(); + assert_eq!(struct_arr.columns().len(), 2); let data = struct_arr.column(0).as_binary::(); let uri = struct_arr.column(1).as_string::(); @@ -194,4 +356,45 @@ mod tests { let err = b.push_uri("").unwrap_err(); assert!(err.to_string().contains("URI cannot be empty")); } + + #[test] + fn test_builder_source_id() { + let mut b = BlobArrayBuilder::new(2); + b.push_bytes_with_source_id("image:1", b"hi").unwrap(); + b.push_uri_with_source_id("image:2", "s3://bucket/key", Some(3), Some(4)) + .unwrap(); + + let arr = b.finish().unwrap(); + let struct_arr = arr.as_struct(); + assert_eq!(struct_arr.columns().len(), 5); + let uri = struct_arr.column(1).as_string::(); + let position = struct_arr.column(2).as_primitive::(); + let size = struct_arr.column(3).as_primitive::(); + let source_id = struct_arr.column(4).as_string::(); + + assert_eq!(source_id.value(0), "image:1"); + assert_eq!(uri.value(1), "s3://bucket/key"); + assert_eq!(position.value(1), 3); + assert_eq!(size.value(1), 4); + assert_eq!(source_id.value(1), "image:2"); + } + + #[test] + fn test_builder_slice_shape() { + let mut b = BlobArrayBuilder::new(1); + b.push_uri_with_slice("s3://bucket/key", 3, 4).unwrap(); + + let field = b.field("blob", true); + let arr = b.finish().unwrap(); + let struct_arr = arr.as_struct(); + + assert_eq!(struct_arr.columns().len(), 4); + let DataType::Struct(fields) = field.data_type() else { + panic!("expected struct blob field"); + }; + assert_eq!( + fields.iter().map(|f| f.name().as_str()).collect::>(), + vec!["data", "uri", "position", "size"] + ); + } } diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index f2c243367ce..55c1a2dcc76 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -208,6 +208,7 @@ pub struct BlobPreprocessor { blob_v2_cols: Vec, dedicated_thresholds: Vec, writer_metadata: Vec>, + source_shares: Vec>, external_base_resolver: Option>, allow_external_blob_outside_bases: bool, external_blob_mode: ExternalBlobMode, @@ -215,6 +216,14 @@ pub struct BlobPreprocessor { source_store_params: ObjectStoreParams, } +#[derive(Clone, Debug)] +struct SharedBlobDescriptor { + kind: BlobKind, + blob_id: u32, + position: Option, + size: u64, +} + /// A logical slice of an external blob that can be materialized or streamed into Lance-managed /// storage. struct ExternalBlobSource { @@ -333,6 +342,10 @@ impl BlobPreprocessor { .iter() .map(|field| field.metadata().clone()) .collect(); + let source_shares = fields + .iter() + .map(|_| HashMap::::new()) + .collect(); Self { object_store, data_dir, @@ -343,6 +356,7 @@ impl BlobPreprocessor { blob_v2_cols, dedicated_thresholds, writer_metadata, + source_shares, external_base_resolver, allow_external_blob_outside_bases, external_blob_mode, @@ -379,6 +393,76 @@ impl BlobPreprocessor { .await } + fn shared_descriptor( + &self, + column_idx: usize, + source_id: Option<&str>, + declared_size: Option, + ) -> Result> { + let Some(source_id) = source_id else { + return Ok(None); + }; + let Some(descriptor) = self.source_shares[column_idx].get(source_id) else { + return Ok(None); + }; + if let Some(declared_size) = declared_size + && declared_size != descriptor.size + { + return Err(Error::invalid_input(format!( + "Blob source_id '{}' has inconsistent sizes within one data file: first size={}, later size={}", + source_id, descriptor.size, declared_size + ))); + } + Ok(Some(descriptor.clone())) + } + + fn remember_shared_descriptor( + &mut self, + column_idx: usize, + source_id: Option<&str>, + descriptor: SharedBlobDescriptor, + ) { + if let Some(source_id) = source_id { + self.source_shares[column_idx] + .entry(source_id.to_string()) + .or_insert(descriptor); + } + } + + async fn write_lance_owned_descriptor( + &mut self, + column_idx: usize, + source_id: Option<&str>, + source: BlobWriteSource<'_>, + dedicated_threshold: usize, + ) -> Result { + let data_len = source.size() as u64; + if let Some(descriptor) = self.shared_descriptor(column_idx, source_id, Some(data_len))? { + return Ok(descriptor); + } + + let descriptor = if source.size() > dedicated_threshold { + let blob_id = self.next_blob_id(); + self.write_dedicated(blob_id, source).await?; + SharedBlobDescriptor { + kind: BlobKind::Dedicated, + blob_id, + position: None, + size: data_len, + } + } else { + let (blob_id, position) = self.write_packed(source).await?; + SharedBlobDescriptor { + kind: BlobKind::Packed, + blob_id, + position: Some(position), + size: data_len, + } + }; + self.remember_shared_descriptor(column_idx, source_id, descriptor.clone()); + Ok(descriptor) + } + async fn resolve_external_reference(&mut self, uri: &str) -> Result<(u32, String)> { let mapped = if let Some(resolver) = &self.external_base_resolver { resolver.resolve_external_uri(uri).await? @@ -486,6 +570,9 @@ impl BlobPreprocessor { let size_col = struct_arr .column_by_name("size") .map(|col| col.as_primitive::()); + let source_id_col = struct_arr + .column_by_name("source_id") + .map(|col| col.as_string::()); let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0); let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0); @@ -501,6 +588,15 @@ impl BlobPreprocessor { for i in 0..struct_arr.len() { if struct_arr.is_null(i) { + if source_id_col + .as_ref() + .map(|col| !col.is_null(i)) + .unwrap_or(false) + { + return Err(Error::invalid_input( + "Blob source_id cannot be set on a null row", + )); + } data_builder.append_null(); uri_builder.append_null(); blob_id_builder.append_null(); @@ -521,33 +617,39 @@ impl BlobPreprocessor { .map(|col| !col.is_null(i)) .unwrap_or(false); let data_len = if has_data { data_col.value(i).len() } else { 0 }; - - let dedicated_threshold = self.dedicated_thresholds[idx]; - if has_data && data_len > dedicated_threshold { - let blob_id = self.next_blob_id(); - self.write_dedicated(blob_id, BlobWriteSource::Bytes(data_col.value(i))) - .await?; - - kind_builder.append_value(BlobKind::Dedicated as u8); - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_value(blob_id); - blob_size_builder.append_value(data_len as u64); - position_builder.append_null(); - continue; + let explicit_source_id = source_id_col + .as_ref() + .and_then(|col| (!col.is_null(i)).then(|| col.value(i))); + if let Some(source_id) = explicit_source_id { + if source_id.is_empty() { + return Err(Error::invalid_input("Blob source_id cannot be empty")); + } + if !has_data && !has_uri { + return Err(Error::invalid_input( + "Blob source_id cannot be set on a row without data or uri", + )); + } } - if has_data && data_len > INLINE_MAX { - let (pack_blob_id, position) = self - .write_packed(BlobWriteSource::Bytes(data_col.value(i))) + let dedicated_threshold = self.dedicated_thresholds[idx]; + if has_data && (data_len > dedicated_threshold || data_len > INLINE_MAX) { + let descriptor = self + .write_lance_owned_descriptor( + idx, + explicit_source_id, + BlobWriteSource::Bytes(data_col.value(i)), + dedicated_threshold, + ) .await?; - - kind_builder.append_value(BlobKind::Packed as u8); - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_value(pack_blob_id); - blob_size_builder.append_value(data_len as u64); - position_builder.append_value(position); + append_lance_owned_descriptor( + &descriptor, + &mut kind_builder, + &mut data_builder, + &mut uri_builder, + &mut blob_id_builder, + &mut blob_size_builder, + &mut position_builder, + ); continue; } @@ -569,34 +671,47 @@ impl BlobPreprocessor { } else { None }; - let source = self.open_external_source(uri_val, position, size).await?; - let data_len = source.size(); - - if data_len > dedicated_threshold as u64 { - let blob_id = self.next_blob_id(); - self.write_dedicated(blob_id, BlobWriteSource::External(&source)) - .await?; - - kind_builder.append_value(BlobKind::Dedicated as u8); - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_value(blob_id); - blob_size_builder.append_value(data_len); - position_builder.append_null(); + let implicit_source_id; + let source_id = if explicit_source_id.is_some() { + explicit_source_id + } else { + implicit_source_id = + Some(implicit_external_source_id(uri_val, position, size)); + implicit_source_id.as_deref() + }; + if let Some(descriptor) = self.shared_descriptor(idx, source_id, size)? { + append_lance_owned_descriptor( + &descriptor, + &mut kind_builder, + &mut data_builder, + &mut uri_builder, + &mut blob_id_builder, + &mut blob_size_builder, + &mut position_builder, + ); continue; } + let source = self.open_external_source(uri_val, position, size).await?; + let data_len = source.size(); - if data_len > INLINE_MAX as u64 { - let (pack_blob_id, position) = self - .write_packed(BlobWriteSource::External(&source)) + if data_len > dedicated_threshold as u64 || data_len > INLINE_MAX as u64 { + let descriptor = self + .write_lance_owned_descriptor( + idx, + source_id, + BlobWriteSource::External(&source), + dedicated_threshold, + ) .await?; - - kind_builder.append_value(BlobKind::Packed as u8); - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_value(pack_blob_id); - blob_size_builder.append_value(data_len); - position_builder.append_value(position); + append_lance_owned_descriptor( + &descriptor, + &mut kind_builder, + &mut data_builder, + &mut uri_builder, + &mut blob_id_builder, + &mut blob_size_builder, + &mut position_builder, + ); continue; } @@ -700,6 +815,33 @@ impl BlobPreprocessor { } } +fn append_lance_owned_descriptor( + descriptor: &SharedBlobDescriptor, + kind_builder: &mut PrimitiveBuilder, + data_builder: &mut LargeBinaryBuilder, + uri_builder: &mut StringBuilder, + blob_id_builder: &mut PrimitiveBuilder, + blob_size_builder: &mut PrimitiveBuilder, + position_builder: &mut PrimitiveBuilder, +) { + kind_builder.append_value(descriptor.kind as u8); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_value(descriptor.blob_id); + blob_size_builder.append_value(descriptor.size); + match descriptor.position { + Some(position) => position_builder.append_value(position), + None => position_builder.append_null(), + } +} + +fn implicit_external_source_id(uri: &str, position: Option, size: Option) -> String { + match (position, size) { + (Some(position), Some(size)) => format!("{uri}#{position}:{size}"), + _ => format!("{uri}#full"), + } +} + fn dedicated_threshold_from_metadata(field: &arrow_schema::Field) -> usize { field .metadata() @@ -970,14 +1112,175 @@ fn shared_blob_source( /// A file-like object that represents a blob in a dataset #[derive(Debug)] pub struct BlobFile { - source: Arc, + plan: BlobReadPlan, state: Arc>, - position: u64, - size: u64, kind: BlobKind, uri: Option, } +#[derive(Clone, Copy, Debug)] +enum BlobReadDecoder { + Identity, +} + +/// Logical read plan for a blob descriptor. +#[derive(Clone, Debug)] +struct BlobReadPlan { + source: Arc, + logical_size: u64, + physical_ranges: Vec>, + decoder: BlobReadDecoder, +} + +impl BlobReadPlan { + fn new_identity( + source: Arc, + physical_range: Range, + logical_size: u64, + ) -> Self { + Self { + source, + logical_size, + physical_ranges: vec![physical_range], + decoder: BlobReadDecoder::Identity, + } + } + + fn source(&self) -> &Arc { + &self.source + } + + fn logical_size(&self) -> u64 { + self.logical_size + } + + fn physical_position(&self) -> u64 { + self.physical_ranges + .first() + .map(|range| range.start) + .unwrap_or(0) + } + + fn full_physical_ranges(&self) -> Vec> { + self.physical_ranges_for_logical(0..self.logical_size) + .expect("full logical blob range must be valid") + } + + fn physical_ranges_for_logical(&self, range: Range) -> Result>> { + if range.start > range.end { + return Err(Error::invalid_input(format!( + "Blob range start {} must be <= end {}", + range.start, range.end + ))); + } + if range.end > self.logical_size { + return Err(Error::invalid_input(format!( + "Blob range end {} exceeds blob size {}", + range.end, self.logical_size + ))); + } + if range.is_empty() { + return Ok(Vec::new()); + } + + match self.decoder { + BlobReadDecoder::Identity => { + let mut physical_ranges = Vec::new(); + let mut logical_offset = 0_u64; + for physical_range in &self.physical_ranges { + let segment_len = physical_range + .end + .checked_sub(physical_range.start) + .ok_or_else(|| { + Error::invalid_input(format!( + "Blob physical range start {} must be <= end {}", + physical_range.start, physical_range.end + )) + })?; + let segment_end = logical_offset.checked_add(segment_len).ok_or_else(|| { + Error::invalid_input(format!( + "Blob logical segment overflows u64: offset={}, len={}", + logical_offset, segment_len + )) + })?; + if range.start < segment_end && range.end > logical_offset { + let start = range.start.max(logical_offset); + let end = range.end.min(segment_end); + let physical_start = physical_range + .start + .checked_add(start - logical_offset) + .ok_or_else(|| { + Error::invalid_input(format!( + "Blob logical range overflowed physical position: base={} offset={}", + physical_range.start, + start - logical_offset + )) + })?; + let physical_end = physical_range + .start + .checked_add(end - logical_offset) + .ok_or_else(|| { + Error::invalid_input(format!( + "Blob logical range overflowed physical position: base={} offset={}", + physical_range.start, + end - logical_offset + )) + })?; + physical_ranges.push(physical_start..physical_end); + } + logical_offset = segment_end; + } + Ok(physical_ranges) + } + } + } + + async fn read_ranges(&self, ranges: &[Range]) -> Result> { + let mut request_ranges = Vec::new(); + let mut response = Vec::with_capacity(ranges.len()); + for (request_idx, range) in ranges.iter().enumerate() { + let physical_ranges = self.physical_ranges_for_logical(range.clone())?; + response.push(vec![Bytes::new(); physical_ranges.len()]); + for (range_idx, physical_range) in physical_ranges.into_iter().enumerate() { + if physical_range.is_empty() { + continue; + } + request_ranges.push((physical_range, request_idx, range_idx)); + } + } + if !request_ranges.is_empty() { + let chunks = self + .source + .read_ranges( + request_ranges + .iter() + .map(|(range, _, _)| range.clone()) + .collect(), + ) + .await?; + for ((_, request_idx, range_idx), chunk) in request_ranges.into_iter().zip(chunks) { + response[request_idx][range_idx] = chunk; + } + } + Ok(response.into_iter().map(concat_bytes).collect()) + } +} + +fn concat_bytes(chunks: Vec) -> Bytes { + match chunks.len() { + 0 => Bytes::new(), + 1 => chunks.into_iter().next().unwrap_or_default(), + _ => { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let mut data = Vec::with_capacity(len); + for chunk in chunks { + data.extend_from_slice(chunk.as_ref()); + } + Bytes::from(data) + } + } +} + /// Base-aware physical location metadata used while resolving blob reads. /// /// This is cached per fragment so repeated rows from the same fragment do not @@ -999,9 +1302,7 @@ impl BlobFile { uri: Option, ) -> Self { Self { - source, - position, - size, + plan: BlobReadPlan::new_identity(source, position..(position + size), size), kind, uri, state: Arc::new(Mutex::new(BlobFileState::Open(0))), @@ -1148,34 +1449,6 @@ impl BlobFile { } } - fn read_phys_range(&self, range: Range) -> Result> { - if range.start > range.end { - return Err(Error::invalid_input(format!( - "Blob range start {} must be <= end {}", - range.start, range.end - ))); - } - if range.end > self.size { - return Err(Error::invalid_input(format!( - "Blob range end {} exceeds blob size {}", - range.end, self.size - ))); - } - let start = self.position.checked_add(range.start).ok_or_else(|| { - Error::invalid_input(format!( - "Blob range start overflowed physical position: base={} offset={}", - self.position, range.start - )) - })?; - let end = self.position.checked_add(range.end).ok_or_else(|| { - Error::invalid_input(format!( - "Blob range end overflowed physical position: base={} offset={}", - self.position, range.end - )) - })?; - Ok(start..end) - } - /// Read a byte range relative to the beginning of this blob without changing the cursor. /// /// The provided range is interpreted in blob-local coordinates, not object @@ -1194,12 +1467,7 @@ impl BlobFile { /// be reordered, coalesced, or split for efficiency. pub async fn read_ranges(&self, ranges: &[Range]) -> Result> { self.ensure_open().await?; - let physical_ranges = ranges - .iter() - .cloned() - .map(|range| self.read_phys_range(range)) - .collect::>>()?; - self.source.read_ranges(physical_ranges).await + self.plan.read_ranges(ranges).await } /// Read the entire blob file from the current cursor position @@ -1208,20 +1476,16 @@ impl BlobFile { /// After this call the cursor will be pointing to the end of /// the file. pub async fn read(&self) -> Result { - let size = self.size; - let source = self.source.clone(); - let position = self.position; + let plan = self.plan.clone(); + let size = plan.logical_size(); self.do_with_cursor(move |cursor| { - let source = source.clone(); + let plan = plan.clone(); async move { if cursor >= size { return Ok((size, Bytes::new())); } - let physical = (position + cursor)..(position + size); - Ok(( - size, - source.read_ranges(vec![physical]).await?.pop().unwrap(), - )) + let data = plan.read_ranges(&[cursor..size]).await?.pop().unwrap(); + Ok((size, data)) } }) .await @@ -1232,19 +1496,20 @@ impl BlobFile { /// After this call the cursor will be pointing to the end of /// the read data. pub async fn read_up_to(&self, len: usize) -> Result { - let size = self.size; - let source = self.source.clone(); - let position = self.position; + let plan = self.plan.clone(); + let size = plan.logical_size(); self.do_with_cursor(move |cursor| { - let source = source.clone(); + let plan = plan.clone(); async move { if cursor >= size || len == 0 { return Ok((size.min(cursor), Bytes::new())); } let read_size = len.min((size - cursor) as usize) as u64; - let start = position + cursor; - let end = start + read_size; - let data = source.read_ranges(vec![start..end]).await?.pop().unwrap(); + let data = plan + .read_ranges(&[cursor..(cursor + read_size)]) + .await? + .pop() + .unwrap(); Ok((cursor + read_size, data)) } }) @@ -1278,15 +1543,15 @@ impl BlobFile { /// Return the size of the blob file in bytes pub fn size(&self) -> u64 { - self.size + self.plan.logical_size() } pub fn position(&self) -> u64 { - self.position + self.plan.physical_position() } pub fn data_path(&self) -> &Path { - &self.source.path + &self.plan.source().path } pub fn kind(&self) -> BlobKind { @@ -1498,12 +1763,12 @@ struct BlobEntry { struct PlannedBlobRead { selection_index: usize, row_address: u64, - physical_range: Range, + physical_ranges: Vec>, } /// One per-source read plan emitted by `read_blobs`. #[derive(Debug)] -struct BlobReadPlan { +struct SourceBlobReadPlan { source_key: BlobSourceKey, source: Arc, reads: Vec, @@ -1562,19 +1827,19 @@ fn into_read_blob(blob: IndexedReadBlob) -> ReadBlob { /// Group selected blobs by physical source and sort each group's ranges by /// physical offset before handing them to the file scheduler. -fn plan_blob_read_plans(entries: Vec) -> Vec { +fn plan_blob_read_plans(entries: Vec) -> Vec { let mut plan_indices = HashMap::::new(); - let mut plans = Vec::::new(); + let mut plans = Vec::::new(); for entry in entries { - let source_key = BlobSourceKey::new(&entry.file.source); + let source_key = BlobSourceKey::new(entry.file.plan.source()); let plan_index = if let Some(plan_index) = plan_indices.get(&source_key) { *plan_index } else { let plan_index = plans.len(); - plans.push(BlobReadPlan { + plans.push(SourceBlobReadPlan { source_key: source_key.clone(), - source: entry.file.source.clone(), + source: entry.file.plan.source().clone(), reads: Vec::new(), }); plan_indices.insert(source_key.clone(), plan_index); @@ -1584,7 +1849,7 @@ fn plan_blob_read_plans(entries: Vec) -> Vec { plans[plan_index].reads.push(PlannedBlobRead { selection_index: entry.selection_index, row_address: entry.row_address, - physical_range: entry.file.position..(entry.file.position + entry.file.size), + physical_ranges: entry.file.plan.full_physical_ranges(), }); } @@ -1597,10 +1862,12 @@ fn plan_blob_read_plans(entries: Vec) -> Vec { for plan in &mut plans { plan.reads.sort_by(|left, right| { - left.physical_range + let left_range = left.physical_ranges.first().cloned().unwrap_or(0..0); + let right_range = right.physical_ranges.first().cloned().unwrap_or(0..0); + left_range .start - .cmp(&right.physical_range.start) - .then_with(|| left.physical_range.end.cmp(&right.physical_range.end)) + .cmp(&right_range.start) + .then_with(|| left_range.end.cmp(&right_range.end)) .then_with(|| left.selection_index.cmp(&right.selection_index)) }); } @@ -1610,29 +1877,57 @@ fn plan_blob_read_plans(entries: Vec) -> Vec { /// Execute one per-source blob read plan with a single scheduler submission. async fn execute_blob_read_plan( - task: BlobReadPlan, + task: SourceBlobReadPlan, execution: Arc, ) -> Result> { - let ranges = task + let total_ranges = task .reads .iter() - .map(|read| read.physical_range.clone()) + .map(|read| read.physical_ranges.len()) + .sum(); + let mut request_ranges = Vec::with_capacity(total_ranges); + let mut response = task + .reads + .iter() + .map(|read| vec![Bytes::new(); read.physical_ranges.len()]) .collect::>(); + for (read_idx, read) in task.reads.iter().enumerate() { + for (range_idx, range) in read.physical_ranges.iter().enumerate() { + if range.is_empty() { + continue; + } + request_ranges.push((range.clone(), read_idx, range_idx)); + } + } let scheduler = execution.scheduler_for(&task.source); let file_scheduler = scheduler .open_file(&task.source.path, &task.source.file_size) .await?; - let priority = ranges[0].start; - let bytes = file_scheduler.submit_request(ranges, priority).await?; + if !request_ranges.is_empty() { + request_ranges.sort_by_key(|(range, _, _)| (range.start, range.end)); + let priority = request_ranges[0].0.start; + let bytes = file_scheduler + .submit_request( + request_ranges + .iter() + .map(|(range, _, _)| range.clone()) + .collect::>(), + priority, + ) + .await?; + for ((_, read_idx, range_idx), data) in request_ranges.into_iter().zip(bytes) { + response[read_idx][range_idx] = data; + } + } Ok(task .reads .into_iter() - .zip(bytes) - .map(|(read, data)| IndexedReadBlob { + .zip(response) + .map(|(read, chunks)| IndexedReadBlob { selection_index: read.selection_index, row_address: read.row_address, - data, + data: concat_bytes(chunks), }) .collect()) } @@ -3045,6 +3340,224 @@ mod tests { assert_eq!(second.as_ref(), b"world"); } + #[tokio::test] + async fn test_blob_v2_source_id_shares_packed_descriptor() { + let test_dir = TempStrDir::default(); + let payload = vec![7_u8; super::INLINE_MAX + 1024]; + + let mut blob_builder = BlobArrayBuilder::new(3); + blob_builder + .push_bytes_with_source_id("image:1", &payload) + .unwrap(); + blob_builder + .push_bytes_with_source_id("image:1", &payload) + .unwrap(); + blob_builder.push_bytes(&payload).unwrap(); + let blob_field = blob_builder.field("blob", true); + let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); + + let schema = Arc::new(Schema::new(vec![blob_field])); + let batch = RecordBatch::try_new(schema.clone(), vec![blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let dataset = Arc::new( + Dataset::write( + reader, + &test_dir, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap(), + ); + let manifest_blob_field = dataset.schema().field("blob").unwrap(); + assert!( + !manifest_blob_field + .children + .iter() + .any(|f| f.name == "source_id") + ); + + let desc_batch = dataset + .scan() + .project(&["blob"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let desc_schema = desc_batch.schema(); + let DataType::Struct(desc_fields) = desc_schema.field(0).data_type() else { + panic!("expected blob descriptor struct"); + }; + assert!(!desc_fields.iter().any(|f| f.name() == "source_id")); + let desc = desc_batch.column(0).as_struct(); + let kinds = desc + .column_by_name("kind") + .unwrap() + .as_primitive::(); + let positions = desc + .column_by_name("position") + .unwrap() + .as_primitive::(); + let sizes = desc + .column_by_name("size") + .unwrap() + .as_primitive::(); + let blob_ids = desc + .column_by_name("blob_id") + .unwrap() + .as_primitive::(); + + assert_eq!(kinds.value(0), BlobKind::Packed as u8); + assert_eq!(kinds.value(1), BlobKind::Packed as u8); + assert_eq!(blob_ids.value(0), blob_ids.value(1)); + assert_eq!(positions.value(0), positions.value(1)); + assert_eq!(sizes.value(0), sizes.value(1)); + assert_ne!(positions.value(1), positions.value(2)); + + let blobs = dataset + .take_blobs_by_indices(&[0, 1, 2], "blob") + .await + .unwrap(); + for blob in blobs { + assert_eq!(blob.read().await.unwrap().as_ref(), payload.as_slice()); + } + } + + #[tokio::test] + async fn test_blob_v2_source_id_size_mismatch_rejected() { + let test_dir = TempStrDir::default(); + + let mut blob_builder = BlobArrayBuilder::new(2); + blob_builder + .push_bytes_with_source_id("image:1", vec![1_u8; super::INLINE_MAX + 1]) + .unwrap(); + blob_builder + .push_bytes_with_source_id("image:1", vec![2_u8; super::INLINE_MAX + 2]) + .unwrap(); + let blob_field = blob_builder.field("blob", true); + let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); + + let schema = Arc::new(Schema::new(vec![blob_field])); + let batch = RecordBatch::try_new(schema.clone(), vec![blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let err = Dataset::write( + reader, + &test_dir, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap_err(); + + let message = err.to_string(); + assert!(message.contains("source_id 'image:1'")); + assert!(message.contains("first size=65537")); + assert!(message.contains("later size=65538")); + } + + #[tokio::test] + async fn test_blob_v2_source_id_inline_noop() { + let test_dir = TempStrDir::default(); + + let mut blob_builder = BlobArrayBuilder::new(2); + blob_builder + .push_bytes_with_source_id("image:1", b"a") + .unwrap(); + blob_builder + .push_bytes_with_source_id("image:1", b"bb") + .unwrap(); + let blob_field = blob_builder.field("blob", true); + let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); + + let schema = Arc::new(Schema::new(vec![blob_field])); + let batch = RecordBatch::try_new(schema.clone(), vec![blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let dataset = Arc::new( + Dataset::write( + reader, + &test_dir, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap(), + ); + + let blobs = dataset + .take_blobs_by_indices(&[0, 1], "blob") + .await + .unwrap(); + assert_eq!(blobs[0].kind(), BlobKind::Inline); + assert_eq!(blobs[1].kind(), BlobKind::Inline); + assert_eq!(blobs[0].read().await.unwrap().as_ref(), b"a"); + assert_eq!(blobs[1].read().await.unwrap().as_ref(), b"bb"); + } + + #[tokio::test] + async fn test_blob_v2_external_ingest_implicit_source_id_shares_descriptor() { + let dataset_dir = TempDir::default(); + let external_dir = TempDir::default(); + let external_path = external_dir.std_path().join("external.bin"); + let payload = vec![9_u8; super::INLINE_MAX + 1024]; + std::fs::write(&external_path, &payload).unwrap(); + let external_uri = format!("file://{}", external_path.display()); + + let mut blob_builder = BlobArrayBuilder::new(2); + blob_builder.push_uri(external_uri.clone()).unwrap(); + blob_builder.push_uri(external_uri).unwrap(); + let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); + let schema = Arc::new(Schema::new(vec![blob_field("blob", true)])); + let batch = RecordBatch::try_new(schema.clone(), vec![blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + + let dataset = Arc::new( + Dataset::write( + reader, + &dataset_dir.path_str(), + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + external_blob_mode: ExternalBlobMode::Ingest, + ..Default::default() + }), + ) + .await + .unwrap(), + ); + + let desc_batch = dataset + .scan() + .project(&["blob"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let desc = desc_batch.column(0).as_struct(); + let positions = desc + .column_by_name("position") + .unwrap() + .as_primitive::(); + let blob_ids = desc + .column_by_name("blob_id") + .unwrap() + .as_primitive::(); + assert_eq!(blob_ids.value(0), blob_ids.value(1)); + assert_eq!(positions.value(0), positions.value(1)); + + std::fs::remove_file(external_path).unwrap(); + let blobs = dataset + .take_blobs_by_indices(&[0, 1], "blob") + .await + .unwrap(); + assert_eq!(blobs[0].read().await.unwrap().as_ref(), payload.as_slice()); + assert_eq!(blobs[1].read().await.unwrap().as_ref(), payload.as_slice()); + } + #[tokio::test] async fn test_blob_file_read_empty_range_returns_empty_bytes() { let store = reject_empty_range_store();