Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2443,6 +2443,66 @@ def test_merge_insert_subcols(tmp_path: Path):
assert dataset.to_table().sort_by("a") == expected


def test_merge_insert_subcols_with_json_column(tmp_path: Path):
"""Test merge_insert with subschema update on a JSON extension type column.

Previously this would fail with:
'Incorrect datatype for StructArray field, expected Utf8 got LargeBinary'
because the update_fragments path didn't handle the Arrow JSON ↔ Lance JSON
type mismatch during interleave.
"""
import json

json_type = pa.json_()
initial_data = pa.table(
{
"id": pa.array([1, 2, 3, 4, 5], type=pa.int64()),
"name": pa.array(["a", "b", "c", "d", "e"], type=pa.utf8()),
"score": pa.array([10, 20, 30, 40, 50], type=pa.int64()),
"meta": pa.array(
['{"x":1}', '{"x":2}', '{"x":3}', '{"x":4}', '{"x":5}'],
type=json_type,
),
}
)
dataset = lance.write_dataset(initial_data, tmp_path / "merge_json_subcols")

# Subschema update: only provide id (key) + meta (JSON column to update)
new_values = pa.table(
{
"id": pa.array([2, 4], type=pa.int64()),
"meta": pa.array(
['{"updated":true,"id":2}', '{"updated":true,"id":4}'],
type=json_type,
),
}
)

# This should NOT raise a type mismatch error
dataset.merge_insert("id").when_matched_update_all().execute(new_values)

# Verify results
result = dataset.to_table().sort_by("id")
ids = result.column("id").to_pylist()
scores = result.column("score").to_pylist()
metas = result.column("meta").to_pylist()

# Score column (not in update) should be preserved
assert scores == [10, 20, 30, 40, 50]

# Meta column should be updated for id=2 and id=4
for id_val, meta_val in zip(ids, metas):
parsed = json.loads(meta_val) if isinstance(meta_val, str) else meta_val
if id_val in (2, 4):
assert parsed.get("updated") is True, (
f"id={id_val} should have updated meta, got {meta_val}"
)
else:
assert "x" in str(parsed), (
f"id={id_val} should have original meta, got {meta_val}"
)


def test_merge_insert_defaults_to_pk_when_on_omitted(tmp_path):
base_dir = tmp_path / "merge_insert_pk_default"

Expand Down
63 changes: 63 additions & 0 deletions python/python/tests/test_fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,3 +835,66 @@ def test_fragment_delete_rows(tmp_path: Path):
frag.delete_rows([100])
with pytest.raises(ValueError, match="out of range"):
frag.delete_rows([0, 50, 1000])


def test_fragment_update_columns_with_json_column(tmp_path):
"""Test that fragment update_columns works with Arrow JSON extension type.

Previously this would fail with a type mismatch error because the
HashJoiner didn't convert Arrow JSON (Utf8) to Lance JSON (LargeBinary).
"""
# Create initial dataset with a JSON extension type column
json_type = pa.json_()
data = pa.table(
{
"id": pa.array([1, 2, 3, 4, 5], type=pa.int64()),
"name": pa.array(["a", "b", "c", "d", "e"], type=pa.utf8()),
"meta": pa.array(
['{"x":1}', '{"x":2}', '{"x":3}', '{"x":4}', '{"x":5}'],
type=json_type,
),
}
)
dataset_uri = tmp_path / "test_update_cols_json"
dataset = lance.write_dataset(data, dataset_uri)

# Prepare update data: update the JSON column for some rows
update_data = pa.table(
{
"_rowid": pa.array([1, 3], type=pa.uint64()),
"meta": pa.array(
['{"updated":true,"id":2}', '{"updated":true,"id":4}'],
type=json_type,
),
}
)

# This should NOT raise a type mismatch error
fragment = dataset.get_fragment(0)
updated_fragment, fields_modified = fragment.update_columns(update_data)

assert len(fields_modified) > 0

# Commit and verify
op = LanceOperation.Update(
updated_fragments=[updated_fragment],
fields_modified=fields_modified,
)
updated_dataset = lance.LanceDataset.commit(
str(dataset_uri), op, read_version=dataset.version
)

result = updated_dataset.to_table()
ids = result.column("id").to_pylist()
metas = result.column("meta").to_pylist()

for i, (id_val, meta_val) in enumerate(zip(ids, metas)):
meta = json.loads(meta_val) if isinstance(meta_val, str) else meta_val
if id_val == 2 or id_val == 4:
assert "updated" in meta_val or meta.get("updated") is True, (
f"id={id_val} should be updated, got {meta_val}"
)
else:
assert "x" in meta_val or "x" in str(meta), (
f"id={id_val} should have original value, got {meta_val}"
)
141 changes: 141 additions & 0 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::logical_expr::Expr;
use datafusion::scalar::ScalarValue;
use futures::future::{BoxFuture, try_join_all};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, join, stream};
use lance_arrow::json::{convert_json_columns, has_json_fields, is_arrow_json_field};
use lance_arrow::{RecordBatchExt, SchemaExt};
use lance_core::datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions};
use lance_core::utils::address::RowAddress;
Expand Down Expand Up @@ -1892,6 +1893,17 @@ impl FileFragment {
)
.await?;
// Hash join: rows matched on the right-hand stream rewrite columns; track physical offsets via `_rowaddr`.
// Convert Arrow JSON columns (Utf8) to Lance JSON (LargeBinary) in the right stream
// so they match the physical storage format read from the fragment's left batch.
let right_stream: Box<dyn RecordBatchReader + Send> = if right_schema
.fields()
.iter()
.any(|f| is_arrow_json_field(f) || has_json_fields(f))
{
Box::new(JsonConvertingReader::new(right_stream))
} else {
right_stream
};
let joiner = Arc::new(HashJoiner::try_new(right_stream, right_on).await?);
let mut matched_offsets = RoaringBitmap::new();
let frag_id_u32 = u32::try_from(self.metadata.id).map_err(|_| {
Expand Down Expand Up @@ -2932,6 +2944,59 @@ impl FragmentReader {
}
}

/// A wrapper around a `RecordBatchReader` that converts Arrow JSON columns
/// (Utf8/LargeUtf8 with `arrow.json` extension) to Lance JSON columns
/// (LargeBinary with `lance.json` extension / JSONB format).
///
/// This is needed when user-provided data contains Arrow JSON fields but the
/// dataset stores them in Lance's JSONB binary format.
struct JsonConvertingReader {
inner: Box<dyn RecordBatchReader + Send>,
schema: arrow_schema::SchemaRef,
}

impl JsonConvertingReader {
fn new(inner: Box<dyn RecordBatchReader + Send>) -> Self {
use lance_arrow::json::arrow_json_to_lance_json;

// Build the converted schema (Arrow JSON fields → Lance JSON fields)
let orig_schema = inner.schema();
let new_fields: Vec<arrow_schema::FieldRef> = orig_schema
.fields()
.iter()
.map(|f| {
if is_arrow_json_field(f) || has_json_fields(f) {
Arc::new(arrow_json_to_lance_json(f))
} else {
Arc::clone(f)
}
})
.collect();
let schema = Arc::new(arrow_schema::Schema::new_with_metadata(
new_fields,
orig_schema.metadata().clone(),
));

Self { inner, schema }
}
}

impl Iterator for JsonConvertingReader {
type Item = std::result::Result<RecordBatch, arrow_schema::ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|result| result.and_then(|batch| convert_json_columns(&batch)))
}
}

impl RecordBatchReader for JsonConvertingReader {
fn schema(&self) -> arrow_schema::SchemaRef {
self.schema.clone()
}
}

#[cfg(test)]
mod tests {
use arrow_arith::numeric::mul;
Expand Down Expand Up @@ -4406,4 +4471,80 @@ mod tests {
assert_io_eq!(stats, read_iops, 1);
assert_io_lt!(stats, read_bytes, 4096);
}

#[tokio::test]
async fn test_update_columns_with_json_extension_type() {
use arrow_array::UInt64Array;
use lance_arrow::ARROW_EXT_NAME_KEY;
use lance_arrow::json::ARROW_JSON_EXT_NAME;
use lance_core::ROW_ID;
use std::collections::HashMap;

// Create a dataset with an Arrow JSON extension column
let test_dir = TempStrDir::default();
let mut json_metadata = HashMap::new();
json_metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int64, false),
ArrowField::new("name", DataType::Utf8, true),
ArrowField::new("meta", DataType::Utf8, true).with_metadata(json_metadata.clone()),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
Arc::new(StringArray::from(vec![
r#"{"x":1}"#,
r#"{"x":2}"#,
r#"{"x":3}"#,
r#"{"x":4}"#,
r#"{"x":5}"#,
])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
let dataset = Dataset::write(reader, test_dir.as_ref(), None)
.await
.unwrap();

// Build the right stream with Arrow JSON column (Utf8 + arrow.json extension)
// Only update rows with row_id 1 and 3
let update_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(ROW_ID, DataType::UInt64, false),
ArrowField::new("meta", DataType::Utf8, true).with_metadata(json_metadata),
]));
let update_batch = RecordBatch::try_new(
update_schema.clone(),
vec![
Arc::new(UInt64Array::from(vec![1, 3])),
Arc::new(StringArray::from(vec![
r#"{"updated":true,"id":2}"#,
r#"{"updated":true,"id":4}"#,
])),
],
)
.unwrap();
let right_stream: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
vec![Ok(update_batch)],
update_schema,
));

// Perform update_columns - this should NOT fail with type mismatch
// Previously this would error with:
// "It is not possible to interleave arrays of different data types (Utf8 and LargeBinary)"
let mut fragment = dataset.get_fragment(0).unwrap();
let (updated_fragment, fields_modified) = fragment
.update_columns(right_stream, ROW_ID, ROW_ID)
.await
.unwrap();

// Verify the operation produced valid results
assert!(!fields_modified.is_empty());
assert!(!updated_fragment.files.is_empty());
}
}
Loading
Loading