diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index a72a7613241..37400f8e31c 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2443,6 +2443,66 @@ def test_merge_insert_subcols(tmp_path: Path): assert dataset.to_table().sort_by("a") == expected +def test_merge_insert_subcols_with_json_column(tmp_path: Path): + """Test merge_insert with subschema update on a JSON extension type column. + + Previously this would fail with: + 'Incorrect datatype for StructArray field, expected Utf8 got LargeBinary' + because the update_fragments path didn't handle the Arrow JSON ↔ Lance JSON + type mismatch during interleave. + """ + import json + + json_type = pa.json_() + initial_data = pa.table( + { + "id": pa.array([1, 2, 3, 4, 5], type=pa.int64()), + "name": pa.array(["a", "b", "c", "d", "e"], type=pa.utf8()), + "score": pa.array([10, 20, 30, 40, 50], type=pa.int64()), + "meta": pa.array( + ['{"x":1}', '{"x":2}', '{"x":3}', '{"x":4}', '{"x":5}'], + type=json_type, + ), + } + ) + dataset = lance.write_dataset(initial_data, tmp_path / "merge_json_subcols") + + # Subschema update: only provide id (key) + meta (JSON column to update) + new_values = pa.table( + { + "id": pa.array([2, 4], type=pa.int64()), + "meta": pa.array( + ['{"updated":true,"id":2}', '{"updated":true,"id":4}'], + type=json_type, + ), + } + ) + + # This should NOT raise a type mismatch error + dataset.merge_insert("id").when_matched_update_all().execute(new_values) + + # Verify results + result = dataset.to_table().sort_by("id") + ids = result.column("id").to_pylist() + scores = result.column("score").to_pylist() + metas = result.column("meta").to_pylist() + + # Score column (not in update) should be preserved + assert scores == [10, 20, 30, 40, 50] + + # Meta column should be updated for id=2 and id=4 + for id_val, meta_val in zip(ids, metas): + parsed = json.loads(meta_val) if isinstance(meta_val, str) else meta_val + if id_val in (2, 4): + assert parsed.get("updated") is True, ( + f"id={id_val} should have updated meta, got {meta_val}" + ) + else: + assert "x" in str(parsed), ( + f"id={id_val} should have original meta, got {meta_val}" + ) + + def test_merge_insert_defaults_to_pk_when_on_omitted(tmp_path): base_dir = tmp_path / "merge_insert_pk_default" diff --git a/python/python/tests/test_fragment.py b/python/python/tests/test_fragment.py index bdbbac96ac5..274498a2878 100644 --- a/python/python/tests/test_fragment.py +++ b/python/python/tests/test_fragment.py @@ -835,3 +835,66 @@ def test_fragment_delete_rows(tmp_path: Path): frag.delete_rows([100]) with pytest.raises(ValueError, match="out of range"): frag.delete_rows([0, 50, 1000]) + + +def test_fragment_update_columns_with_json_column(tmp_path): + """Test that fragment update_columns works with Arrow JSON extension type. + + Previously this would fail with a type mismatch error because the + HashJoiner didn't convert Arrow JSON (Utf8) to Lance JSON (LargeBinary). + """ + # Create initial dataset with a JSON extension type column + json_type = pa.json_() + data = pa.table( + { + "id": pa.array([1, 2, 3, 4, 5], type=pa.int64()), + "name": pa.array(["a", "b", "c", "d", "e"], type=pa.utf8()), + "meta": pa.array( + ['{"x":1}', '{"x":2}', '{"x":3}', '{"x":4}', '{"x":5}'], + type=json_type, + ), + } + ) + dataset_uri = tmp_path / "test_update_cols_json" + dataset = lance.write_dataset(data, dataset_uri) + + # Prepare update data: update the JSON column for some rows + update_data = pa.table( + { + "_rowid": pa.array([1, 3], type=pa.uint64()), + "meta": pa.array( + ['{"updated":true,"id":2}', '{"updated":true,"id":4}'], + type=json_type, + ), + } + ) + + # This should NOT raise a type mismatch error + fragment = dataset.get_fragment(0) + updated_fragment, fields_modified = fragment.update_columns(update_data) + + assert len(fields_modified) > 0 + + # Commit and verify + op = LanceOperation.Update( + updated_fragments=[updated_fragment], + fields_modified=fields_modified, + ) + updated_dataset = lance.LanceDataset.commit( + str(dataset_uri), op, read_version=dataset.version + ) + + result = updated_dataset.to_table() + ids = result.column("id").to_pylist() + metas = result.column("meta").to_pylist() + + for i, (id_val, meta_val) in enumerate(zip(ids, metas)): + meta = json.loads(meta_val) if isinstance(meta_val, str) else meta_val + if id_val == 2 or id_val == 4: + assert "updated" in meta_val or meta.get("updated") is True, ( + f"id={id_val} should be updated, got {meta_val}" + ) + else: + assert "x" in meta_val or "x" in str(meta), ( + f"id={id_val} should have original value, got {meta_val}" + ) diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 4ea1b51f073..e25c6f769b9 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -22,6 +22,7 @@ use datafusion::logical_expr::Expr; use datafusion::scalar::ScalarValue; use futures::future::{BoxFuture, try_join_all}; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, join, stream}; +use lance_arrow::json::{convert_json_columns, has_json_fields, is_arrow_json_field}; use lance_arrow::{RecordBatchExt, SchemaExt}; use lance_core::datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions}; use lance_core::utils::address::RowAddress; @@ -1892,6 +1893,17 @@ impl FileFragment { ) .await?; // Hash join: rows matched on the right-hand stream rewrite columns; track physical offsets via `_rowaddr`. + // Convert Arrow JSON columns (Utf8) to Lance JSON (LargeBinary) in the right stream + // so they match the physical storage format read from the fragment's left batch. + let right_stream: Box = if right_schema + .fields() + .iter() + .any(|f| is_arrow_json_field(f) || has_json_fields(f)) + { + Box::new(JsonConvertingReader::new(right_stream)) + } else { + right_stream + }; let joiner = Arc::new(HashJoiner::try_new(right_stream, right_on).await?); let mut matched_offsets = RoaringBitmap::new(); let frag_id_u32 = u32::try_from(self.metadata.id).map_err(|_| { @@ -2932,6 +2944,59 @@ impl FragmentReader { } } +/// A wrapper around a `RecordBatchReader` that converts Arrow JSON columns +/// (Utf8/LargeUtf8 with `arrow.json` extension) to Lance JSON columns +/// (LargeBinary with `lance.json` extension / JSONB format). +/// +/// This is needed when user-provided data contains Arrow JSON fields but the +/// dataset stores them in Lance's JSONB binary format. +struct JsonConvertingReader { + inner: Box, + schema: arrow_schema::SchemaRef, +} + +impl JsonConvertingReader { + fn new(inner: Box) -> Self { + use lance_arrow::json::arrow_json_to_lance_json; + + // Build the converted schema (Arrow JSON fields → Lance JSON fields) + let orig_schema = inner.schema(); + let new_fields: Vec = orig_schema + .fields() + .iter() + .map(|f| { + if is_arrow_json_field(f) || has_json_fields(f) { + Arc::new(arrow_json_to_lance_json(f)) + } else { + Arc::clone(f) + } + }) + .collect(); + let schema = Arc::new(arrow_schema::Schema::new_with_metadata( + new_fields, + orig_schema.metadata().clone(), + )); + + Self { inner, schema } + } +} + +impl Iterator for JsonConvertingReader { + type Item = std::result::Result; + + fn next(&mut self) -> Option { + self.inner + .next() + .map(|result| result.and_then(|batch| convert_json_columns(&batch))) + } +} + +impl RecordBatchReader for JsonConvertingReader { + fn schema(&self) -> arrow_schema::SchemaRef { + self.schema.clone() + } +} + #[cfg(test)] mod tests { use arrow_arith::numeric::mul; @@ -4406,4 +4471,80 @@ mod tests { assert_io_eq!(stats, read_iops, 1); assert_io_lt!(stats, read_bytes, 4096); } + + #[tokio::test] + async fn test_update_columns_with_json_extension_type() { + use arrow_array::UInt64Array; + use lance_arrow::ARROW_EXT_NAME_KEY; + use lance_arrow::json::ARROW_JSON_EXT_NAME; + use lance_core::ROW_ID; + use std::collections::HashMap; + + // Create a dataset with an Arrow JSON extension column + let test_dir = TempStrDir::default(); + let mut json_metadata = HashMap::new(); + json_metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int64, false), + ArrowField::new("name", DataType::Utf8, true), + ArrowField::new("meta", DataType::Utf8, true).with_metadata(json_metadata.clone()), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + Arc::new(StringArray::from(vec![ + r#"{"x":1}"#, + r#"{"x":2}"#, + r#"{"x":3}"#, + r#"{"x":4}"#, + r#"{"x":5}"#, + ])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let dataset = Dataset::write(reader, test_dir.as_ref(), None) + .await + .unwrap(); + + // Build the right stream with Arrow JSON column (Utf8 + arrow.json extension) + // Only update rows with row_id 1 and 3 + let update_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new(ROW_ID, DataType::UInt64, false), + ArrowField::new("meta", DataType::Utf8, true).with_metadata(json_metadata), + ])); + let update_batch = RecordBatch::try_new( + update_schema.clone(), + vec![ + Arc::new(UInt64Array::from(vec![1, 3])), + Arc::new(StringArray::from(vec![ + r#"{"updated":true,"id":2}"#, + r#"{"updated":true,"id":4}"#, + ])), + ], + ) + .unwrap(); + let right_stream: Box = Box::new(RecordBatchIterator::new( + vec![Ok(update_batch)], + update_schema, + )); + + // Perform update_columns - this should NOT fail with type mismatch + // Previously this would error with: + // "It is not possible to interleave arrays of different data types (Utf8 and LargeBinary)" + let mut fragment = dataset.get_fragment(0).unwrap(); + let (updated_fragment, fields_modified) = fragment + .update_columns(right_stream, ROW_ID, ROW_ID) + .await + .unwrap(); + + // Verify the operation produced valid results + assert!(!fields_modified.is_empty()); + assert!(!updated_fragment.files.is_empty()); + } } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 3889bdc66d5..ce08d5a4adf 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -97,6 +97,7 @@ use futures::{ Stream, StreamExt, TryStreamExt, stream::{self}, }; +use lance_arrow::json::{convert_json_columns, has_json_fields, is_arrow_json_field}; use lance_arrow::{RecordBatchExt, SchemaExt, interleave_batches}; use lance_core::datatypes::NullabilityComparison; use lance_core::utils::address::RowAddress; @@ -1134,8 +1135,25 @@ impl MergeInsertJob { // will be updates. let mut source_batches = Vec::with_capacity(batches.len() + 1); source_batches.push(batches[0].clone()); // placeholder for source data + // Check once whether JSON conversion is needed (all batches share the same schema) + let needs_json_conversion = batches[0] + .schema() + .as_ref() + .without_column(ROW_ADDR) + .fields() + .iter() + .any(|f| is_arrow_json_field(f) || has_json_fields(f)); for batch in &batches { - source_batches.push(batch.drop_column(ROW_ADDR)?); + let dropped = batch.drop_column(ROW_ADDR)?; + // Convert Arrow JSON columns (Utf8) to Lance JSON (LargeBinary) + // so source_batches are in physical format, matching what the + // updater reads from the fragment. + if needs_json_conversion { + source_batches + .push(convert_json_columns(&dropped).map_err(Error::from)?); + } else { + source_batches.push(dropped); + } } // This function is here to help rustc with lifetimes. @@ -10065,4 +10083,134 @@ MergeInsert: on=[id], when_matched=DoNothing, when_not_matched=InsertAll, when_n "Newly written merge-insert data files should be cleaned up on apply_deletions failure" ); } + + #[tokio::test] + async fn test_merge_insert_subschema_with_json_columns() { + use lance_arrow::ARROW_EXT_NAME_KEY; + use lance_arrow::json::ARROW_JSON_EXT_NAME; + + // Create a dataset with an Arrow JSON extension column + let test_dir = TempStrDir::default(); + let mut json_metadata = HashMap::new(); + json_metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("score", DataType::Int64, true), + Field::new("meta", DataType::Utf8, true).with_metadata(json_metadata.clone()), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + Arc::new(Int64Array::from(vec![10, 20, 30, 40, 50])), + Arc::new(StringArray::from(vec![ + r#"{"x":1}"#, + r#"{"x":2}"#, + r#"{"x":3}"#, + r#"{"x":4}"#, + r#"{"x":5}"#, + ])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let dataset = Arc::new( + Dataset::write(reader, test_dir.as_ref(), None) + .await + .unwrap(), + ); + + // Perform a subschema merge_insert: only update "meta" column (JSON type) + // This exercises the update_fragments path with interleave_batches + let update_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("meta", DataType::Utf8, true).with_metadata(json_metadata), + ])); + let update_batch = RecordBatch::try_new( + update_schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![2, 4])), + Arc::new(StringArray::from(vec![ + r#"{"updated":true,"id":2}"#, + r#"{"updated":true,"id":4}"#, + ])), + ], + ) + .unwrap(); + let update_reader: Box = Box::new(RecordBatchIterator::new( + vec![Ok(update_batch)], + update_schema, + )); + let stream = reader_to_stream(update_reader); + + // Execute merge_insert with subschema (only id + meta columns) + let mut builder = + MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]).unwrap(); + builder.when_matched(WhenMatched::UpdateAll); + builder.when_not_matched(WhenNotMatched::DoNothing); + let job = builder.try_build().unwrap(); + let (updated_dataset, stats) = job.execute(stream).await.unwrap(); + + // Verify: the merge should not fail with type mismatch + assert_eq!(stats.num_updated_rows, 2); + + // Read back and verify the JSON column was updated correctly + let batches = updated_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let result = concat_batches(&batches[0].schema(), &batches).unwrap(); + assert_eq!(result.num_rows(), 5); + + // Verify the "score" column (not in update) is preserved, and "meta" updated + let ids = result + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let scores = result + .column_by_name("score") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let metas = result + .column_by_name("meta") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..5 { + let id = ids.value(i); + let score = scores.value(i); + let meta = metas.value(i); + // score = id * 10, regardless of row order + assert_eq!(score, id * 10, "id={} score mismatch", id); + if id == 2 || id == 4 { + assert!( + meta.contains("updated"), + "id={} should have updated meta, got: {}", + id, + meta + ); + } else { + assert!( + meta.contains("\"x\""), + "id={} should have original meta, got: {}", + id, + meta + ); + } + } + } }