Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions python/python/tests/test_fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,3 +835,37 @@ def test_fragment_delete_rows(tmp_path: Path):
frag.delete_rows([100])
with pytest.raises(ValueError, match="out of range"):
frag.delete_rows([0, 50, 1000])


def test_fragment_create_with_json_column(tmp_path):
"""Test that LanceFragment.create works with Arrow JSON extension type.

Previously the single-fragment create path skipped the Arrow JSON (Utf8) ->
Lance JSON (JSONB LargeBinary) conversion that write_dataset/write_fragments
perform, so the raw UTF-8 string bytes were written into a column whose schema
declared JSONB. Reads then miss-decoded the bytes and returned garbage.
"""
json_type = pa.json_()
data = pa.table(
{
"uid": pa.array(["a", "b", "c", "d"], type=pa.utf8()),
"payload": pa.array(
['{"x":1}', '{"x":2}', '{"y":3}', '{"y":4}'],
type=json_type,
),
}
)

frag = LanceFragment.create(tmp_path, data)
operation = LanceOperation.Overwrite(data.schema, [frag])
dataset = LanceDataset.commit(tmp_path, operation)

result = dataset.to_table()
assert result.column("uid").to_pylist() == ["a", "b", "c", "d"]
payloads = result.column("payload").to_pylist()
assert [json.loads(p) for p in payloads] == [
{"x": 1},
{"x": 2},
{"y": 3},
{"y": 4},
]
7 changes: 7 additions & 0 deletions rust/lance/src/dataset/fragment/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use uuid::Uuid;

use crate::Result;
use crate::dataset::builder::DatasetBuilder;
use crate::dataset::utils::SchemaAdapter;
use crate::dataset::write::{do_write_fragments, validate_and_resolve_target_bases};
use crate::dataset::{DATA_DIR, Dataset, ReadParams, WriteMode, WriteParams};

Expand Down Expand Up @@ -106,6 +107,12 @@ impl<'a> FragmentCreateBuilder<'a> {
id: Option<u64>,
) -> Result<Fragment> {
let (stream, schema) = self.get_stream_and_schema(Box::new(source)).await?;
// Convert Arrow JSON columns (`arrow.json`, stored as Utf8) into Lance JSON
// (`lance.json`, stored as JSONB-encoded LargeBinary) before writing. The
// multi-fragment and dataset write paths perform this through `do_write_fragments`;
// the single-fragment create path must do the same or the raw UTF-8 string bytes
// would be written into a column whose schema declares JSONB, corrupting reads.
let stream = SchemaAdapter::new(stream.schema()).to_physical_stream(stream);
self.write_impl(stream, schema, id).await
}

Expand Down
Loading